diff --git a/cmd/zfs_object_agent/Cargo.lock b/cmd/zfs_object_agent/Cargo.lock index 59a71ffe70c0..da31414a4b63 100644 --- a/cmd/zfs_object_agent/Cargo.lock +++ b/cmd/zfs_object_agent/Cargo.lock @@ -3324,6 +3324,7 @@ dependencies = [ "chrono", "clap", "exitcode", + "git-version", "humantime", "log", "more-asserts", diff --git a/cmd/zfs_object_agent/util/src/message.rs b/cmd/zfs_object_agent/util/src/message.rs index 076e0927b37e..e82d6a2b8fe9 100644 --- a/cmd/zfs_object_agent/util/src/message.rs +++ b/cmd/zfs_object_agent/util/src/message.rs @@ -139,6 +139,10 @@ pub const TYPE_ZCACHE_STATS: &str = "zcache stats"; pub const TYPE_ZCACHE_STATUS: &str = "zcache status"; pub const TYPE_ADD_DISK: &str = "add disk"; pub const TYPE_EXPAND_DISK: &str = "expand disk"; +pub const TYPE_REMOVE_DISK: &str = "remove disk"; +pub const TYPE_CANCEL_DISK_REMOVAL: &str = "cancel disk removal"; +pub const TYPE_PAUSE_DISK_REMOVALS: &str = "pause disk removals"; +pub const TYPE_RESUME_DISK_REMOVALS: &str = "resume disk removals"; pub const TYPE_SYNC_CHECKPOINT: &str = "sync checkpoint"; pub const TYPE_INITIATE_MERGE: &str = "initiate merge"; @@ -152,6 +156,11 @@ pub struct ExpandDiskRequest { pub path: PathBuf, } +#[derive(Serialize, Deserialize, Debug)] +pub struct RemoveDiskRequest { + pub path: PathBuf, +} + #[derive(Serialize, Deserialize, Debug)] pub struct ExpandDiskResponse { pub new_size: u64, diff --git a/cmd/zfs_object_agent/util/src/vec_ext.rs b/cmd/zfs_object_agent/util/src/vec_ext.rs index 4617605316ae..a9ad2312f744 100644 --- a/cmd/zfs_object_agent/util/src/vec_ext.rs +++ b/cmd/zfs_object_agent/util/src/vec_ext.rs @@ -243,7 +243,7 @@ pub struct VecMap { impl VecMap where - K: Into + Copy, + K: From + Into + Copy, { /// Returns old value (or None if not present) pub fn insert(&mut self, key: K, value: V) -> Option { @@ -285,6 +285,10 @@ where self.vec[index].take() } + pub fn contains_key(&self, key: &K) -> bool { + self.get(*key).is_some() + } + pub fn values(&self) -> impl Iterator { self.vec.iter().filter_map(|v| v.as_ref()) } @@ -313,6 +317,38 @@ where self.vec[range].iter_mut().filter_map(|v| v.as_mut()) } + pub fn iter(&self) -> impl DoubleEndedIterator { + self.vec + .iter() + .enumerate() + .flat_map(|(k, v)| v.as_ref().map(|v| (k.into(), v))) + } + + pub fn iter_mut(&mut self) -> impl DoubleEndedIterator { + self.vec + .iter_mut() + .enumerate() + .flat_map(|(k, v)| v.as_mut().map(|v| (k.into(), v))) + } + + pub fn keys(&self) -> impl DoubleEndedIterator + '_ { + self.iter().map(|(k, _)| k) + } + + pub fn retain(&mut self, mut f: F) + where + F: FnMut(&K, &mut V) -> bool, + { + for k in 0..self.vec.len() { + let k = k.into(); + if let Some(v) = self.get_mut(k) { + if !f(&k, v) { + self.remove(k); + } + } + } + } + /// Returns the number of elements in the map. pub fn len(&self) -> usize { self.num_entries @@ -322,3 +358,31 @@ where self.len() == 0 } } + +impl VecMap +where + K: From + Into + Copy, + V: Default, +{ + pub fn get_mut_or_default(&mut self, key: K) -> &mut V { + if self.contains_key(&key) { + return self.get_mut(key).unwrap(); + } + let old = self.insert(key, Default::default()); + assert!(old.is_none()); + self.get_mut(key).unwrap() + } +} + +impl FromIterator<(K, V)> for VecMap +where + K: From + Into + Copy, +{ + fn from_iter>(iter: T) -> VecMap { + let mut map = VecMap::default(); + for (k, v) in iter.into_iter() { + map.insert(k, v); + } + map + } +} diff --git a/cmd/zfs_object_agent/util/src/zettacache_stats.rs b/cmd/zfs_object_agent/util/src/zettacache_stats.rs index 6e1078f59c2f..63a6e47cd913 100644 --- a/cmd/zfs_object_agent/util/src/zettacache_stats.rs +++ b/cmd/zfs_object_agent/util/src/zettacache_stats.rs @@ -386,8 +386,8 @@ pub enum CacheStatCounter { SpeculativeBufferBytesAvailable, SlabCapacity, AvailableSpace, - AvailableBlocksSize, - AvailableSlabsSize, + FreeBlocksSize, + FreeSlabsSize, } impl Display for CacheStatCounter { @@ -447,8 +447,8 @@ impl Sub<&Self> for &CacheStats { | CacheStatCounter::SpeculativeBufferBytesAvailable | CacheStatCounter::SlabCapacity | CacheStatCounter::AvailableSpace - | CacheStatCounter::AvailableSlabsSize - | CacheStatCounter::AvailableBlocksSize => { + | CacheStatCounter::FreeSlabsSize + | CacheStatCounter::FreeBlocksSize => { *diff_stat = self_stat.clone(); } // Everything else should be subtracted diff --git a/cmd/zfs_object_agent/zcache/Cargo.toml b/cmd/zfs_object_agent/zcache/Cargo.toml index 4290009e0ea6..f9125b59512e 100644 --- a/cmd/zfs_object_agent/zcache/Cargo.toml +++ b/cmd/zfs_object_agent/zcache/Cargo.toml @@ -13,6 +13,7 @@ chrono = "0.4" async-trait = "0.1.51" clap={version="3.1.6",features=["derive"]} exitcode = "1.1.2" +git-version = "0.3.5" humantime = "2.1.0" log = "0.4" more-asserts = "0.2.1" diff --git a/cmd/zfs_object_agent/zcache/src/main.rs b/cmd/zfs_object_agent/zcache/src/main.rs index 7b303fbf2c8b..8c574ea98cfc 100644 --- a/cmd/zfs_object_agent/zcache/src/main.rs +++ b/cmd/zfs_object_agent/zcache/src/main.rs @@ -12,6 +12,7 @@ mod iostat; mod labelclear; mod list; mod remote_channel; +mod remove; mod stats; mod status; mod subcommand; @@ -22,17 +23,24 @@ use std::path::PathBuf; use anyhow::Result; use clap::Parser; use clap::Subcommand; +use git_version::git_version; use hits::ClearHitData; use log::*; use subcommand::ZcacheSubCommand; +static GIT_VERSION: &str = git_version!( + fallback = match option_env!("CARGO_ZOA_GITREV") { + Some(value) => value, + None => "unknown", + } +); + fn main() -> Result<()> { async_main() } #[derive(Parser)] -// XXX other commands use a git derived version here -#[clap(version = "1.1")] +#[clap(version=GIT_VERSION)] #[clap(name = "zcache")] #[clap(about = "ZFS ZettaCache Command")] #[clap(propagate_version = true)] @@ -62,6 +70,7 @@ enum Commands { Stats(stats::Stats), Add(add::Add), Expand(expand::Expand), + Remove(remove::Remove), Sync(sync::Sync), Labelclear(labelclear::Labelclear), Status(status::Status), @@ -80,6 +89,7 @@ impl Commands { Commands::Stats(stats) => stats, Commands::Add(add) => add, Commands::Expand(expand) => expand, + Commands::Remove(remove) => remove, Commands::Sync(sync) => sync, Commands::Labelclear(labelclear) => labelclear, Commands::ClearHitData(clear_hit_data) => clear_hit_data, @@ -104,8 +114,26 @@ mod test_clap { use super::*; + fn neg(s: &str) { + assert!(Cli::try_parse_from(s.split_whitespace()).is_err()); + } + #[test] fn test_debug_asserts() { Cli::command().debug_assert(); } + + #[test] + fn test_zcache_remove_conflicts() { + neg("zcache remove --pause --resume"); + neg("zcache remove --pause --cancel"); + neg("zcache remove --pause -s"); + neg("zcache remove --pause disk0"); + neg("zcache remove --resume --cancel"); + neg("zcache remove --resume -s"); + neg("zcache remove --resume -s"); + neg("zcache remove --resume disk0"); + neg("zcache remove --cancel"); + neg("zcache remove"); + } } diff --git a/cmd/zfs_object_agent/zcache/src/remove.rs b/cmd/zfs_object_agent/zcache/src/remove.rs new file mode 100644 index 000000000000..d6cda783415a --- /dev/null +++ b/cmd/zfs_object_agent/zcache/src/remove.rs @@ -0,0 +1,74 @@ +//! `zcache remove` subcommand + +use std::path::PathBuf; + +use anyhow::Result; +use async_trait::async_trait; +use clap::Parser; +use util::message::RemoveDiskRequest; +use util::message::TYPE_CANCEL_DISK_REMOVAL; +use util::message::TYPE_PAUSE_DISK_REMOVALS; +use util::message::TYPE_REMOVE_DISK; +use util::message::TYPE_RESUME_DISK_REMOVALS; +use util::writeln_stdout; + +use crate::remote_channel::RemoteChannel; +use crate::subcommand::ZcacheSubCommand; + +#[derive(Parser)] +#[clap(about = "Remove disks from the ZettaCache.")] +pub struct Remove { + /// Pause ongoing removals + #[clap( + long, + conflicts_with_all = &["resume", "cancel", "path"] + )] + pause: bool, + + /// Resume paused removals + #[clap( + long, + conflicts_with_all = &["cancel", "path"] + )] + resume: bool, + + /// Stop and cancel in-progress removal + #[clap(short = 's', long)] + cancel: bool, + + /// Supplied disk for the operation issued + #[clap(required_unless_present_any = &["pause", "resume"])] + path: Option, +} + +#[async_trait] +impl ZcacheSubCommand for Remove { + async fn invoke(&self) -> Result<()> { + let mut remote = RemoteChannel::new(true).await?; + if self.pause { + remote.call(TYPE_PAUSE_DISK_REMOVALS, None).await?; + writeln_stdout!("Removals paused"); + } else if self.resume { + remote.call(TYPE_RESUME_DISK_REMOVALS, None).await?; + writeln_stdout!("Resuming removals"); + } else if self.cancel { + let path = self.path.clone().unwrap(); + let request = RemoveDiskRequest { path: path.clone() }; + remote + .call( + TYPE_CANCEL_DISK_REMOVAL, + Some(nvpair::to_nvlist(&request).unwrap()), + ) + .await?; + writeln_stdout!("Cancelled the removal of {path:?}"); + } else { + let path = self.path.clone().unwrap(); + let request = RemoveDiskRequest { path: path.clone() }; + remote + .call(TYPE_REMOVE_DISK, Some(nvpair::to_nvlist(&request).unwrap())) + .await?; + writeln_stdout!("Removing {path:?}"); + } + Ok(()) + } +} diff --git a/cmd/zfs_object_agent/zcache/src/stats.rs b/cmd/zfs_object_agent/zcache/src/stats.rs index 3695f1cfdcbc..a6174e58ca51 100644 --- a/cmd/zfs_object_agent/zcache/src/stats.rs +++ b/cmd/zfs_object_agent/zcache/src/stats.rs @@ -256,11 +256,11 @@ impl StatsDisplay { if self.show_block_allocator { let available_space = values.value(AvailableSpace); let slab_capacity = values.value(SlabCapacity); - let free_blocks_size = values.value(AvailableBlocksSize); - let free_slabs_size = values.value(AvailableSlabsSize); - let block_allocator_allocated = slab_capacity - free_blocks_size - free_slabs_size; + let free_blocks_size = values.value(FreeBlocksSize); + let free_slabs_size = values.value(FreeSlabsSize); + let allocated_space = slab_capacity - free_blocks_size - free_slabs_size; - self.display_bytes(block_allocator_allocated as f64); + self.display_bytes(allocated_space as f64); self.display_bytes(available_space as f64); self.display_percent(free_blocks_size as f64, slab_capacity as f64); self.display_percent(free_slabs_size as f64, slab_capacity as f64); diff --git a/cmd/zfs_object_agent/zettacache/src/base_types.rs b/cmd/zfs_object_agent/zettacache/src/base_types.rs index f0aa6ffcc1d7..7e715281879f 100644 --- a/cmd/zfs_object_agent/zettacache/src/base_types.rs +++ b/cmd/zfs_object_agent/zettacache/src/base_types.rs @@ -91,6 +91,19 @@ impl DiskId { pub fn index(self) -> usize { self.0 as usize } + pub fn next(&self) -> DiskId { + DiskId(self.0 + 1) + } +} +impl From for usize { + fn from(val: DiskId) -> Self { + val.index() + } +} +impl From for DiskId { + fn from(val: usize) -> Self { + DiskId::new(val) + } } #[derive(Serialize, Deserialize, Copy, Clone, PartialEq, Eq, Ord, PartialOrd)] diff --git a/cmd/zfs_object_agent/zettacache/src/block_access.rs b/cmd/zfs_object_agent/zettacache/src/block_access.rs index f720ea9e5cdc..a64d4ab37e1d 100644 --- a/cmd/zfs_object_agent/zettacache/src/block_access.rs +++ b/cmd/zfs_object_agent/zettacache/src/block_access.rs @@ -49,6 +49,7 @@ use util::DeviceEntry; use util::DeviceList; use util::DeviceStatus; use util::From64; +use util::VecMap; use uuid::Uuid; use crate::base_types::DiskId; @@ -138,7 +139,7 @@ impl<'a> Drop for OpInProgress<'a> { #[derive(Debug)] pub struct BlockAccess { sector_size: usize, - disks: RwLock>, + disks: RwLock>, readonly: bool, timebase: Instant, } @@ -149,7 +150,17 @@ pub struct Disk { file: Arc, path: PathBuf, + + // In most OSs it is common for a device to be represented by multiple files in different + // subdirectories under /dev, and subdirectories created by different cloud providers may + // complicate things even more. Therefore we remember each device path as the path that was + // supplied from the user so that we can display it back to them in a way that they + // understand. That said, we want to stay flexible and allow the user to specify any path + // that resolves to the same device. So we derive the canonical name so devices in the config + // can be matched with devices supplied by the user regardless of the path they are + // specified. canonical_path: PathBuf, + size: Mutex, sector_size: usize, #[derivative(Debug = "ignore")] @@ -589,9 +600,14 @@ impl Disk { // we can use "glommio" to use io_uring for much lower overheads. Or SPDK // (which can use io_uring or nvme hardware directly). impl BlockAccess { - pub fn new(disks: Vec, readonly: bool) -> Self { - let sector_size = disks - .iter() + pub fn new(disks: &BTreeMap, readonly: bool) -> Result { + let mut disk_map: VecMap = Default::default(); + for (&disk_id, disk_path) in disks { + disk_map.insert(disk_id, Disk::new(disk_path, false)?); + } + + let sector_size = disk_map + .values() .reduce(|a, b| { assert_eq!(a.sector_size, b.sector_size); a @@ -599,34 +615,40 @@ impl BlockAccess { .unwrap() .sector_size; - BlockAccess { + Ok(BlockAccess { sector_size, - disks: RwLock::new(disks), + disks: RwLock::new(disk_map), readonly, timebase: Instant::now(), - } + }) } pub fn add_disk(&self, disk: Disk) -> Result { let mut disks = self.disks.write().unwrap(); - for existing_disk in disks.iter() { - if disk.canonical_path == existing_disk.canonical_path { - return Err(anyhow!( - "disk {:?} ({:?}) is already part of the zettacache", - disk.path, - disk.canonical_path, - )); - } + if disks + .values() + .any(|existing_disk| existing_disk.canonical_path == disk.canonical_path) + { + return Err(anyhow!( + "disk {:?} ({:?}) is already part of the zettacache", + disk.path, + disk.canonical_path, + )); } - let id = DiskId::new(disks.len()); - disks.push(disk); + let id = match disks.keys().next_back() { + Some(id) => id.next(), + None => DiskId::new(0), + }; + disks.insert(id, disk); Ok(id) } // Returns the number of bytes added to the disk. pub fn expand_disk(&self, disk: DiskId) -> Result { let disks = self.disks.read().unwrap(); - let disk = &disks[disk.index()]; + let disk = disks + .get(disk) + .ok_or_else(|| anyhow!("cannot expand removed disk"))?; let (_, new_size) = disk_sizes(&disk.file)?; let mut size = disk.size.lock().unwrap(); let additional_bytes = new_size.checked_sub(*size).ok_or_else(|| { @@ -643,11 +665,21 @@ impl BlockAccess { }) } - /// Note: In the future we'll support device removal in which case the - /// DiskId's will probably not be sequential. By using this accessor we - /// need not assume anything about the values inside the DiskId's. + pub fn remove_disk(&self, disk: DiskId) { + let mut disks = self.disks.write().unwrap(); + let removed = disks.remove(disk); + assert!(removed.is_some()); + } + + /// Accessor function that does not assume anything about the value in of the DiskIds. + /// Note: after device removal, DiskIds may not be sequential. pub fn disks(&self) -> impl Iterator { - (0..self.disks.read().unwrap().len()).map(DiskId::new) + self.disks + .read() + .unwrap() + .keys() + .collect::>() + .into_iter() } pub fn path_to_disk_id(&self, path: &Path) -> Result { @@ -656,8 +688,8 @@ impl BlockAccess { .read() .unwrap() .iter() - .position(|disk| disk.canonical_path == canonical_path) - .map(DiskId::new) + .find(|(_, disk)| disk.canonical_path == canonical_path) + .map(|(disk_id, _)| disk_id) .ok_or_else(|| { anyhow!("disk {path:?} ({canonical_path:?}) is not part of the zettacache") }) @@ -669,7 +701,7 @@ impl BlockAccess { .disks .read() .unwrap() - .iter() + .values() .map(|d| DeviceEntry { name: d.path.clone(), size: *d.size.lock().unwrap(), @@ -683,7 +715,7 @@ impl BlockAccess { self.disks .read() .unwrap() - .iter() + .values() .map(|d| DeviceStatus { path: d.path.clone(), canonical_path: d.canonical_path.clone(), @@ -693,7 +725,12 @@ impl BlockAccess { } pub fn disk_size(&self, disk: DiskId) -> u64 { - *self.disks.read().unwrap()[disk.index()] + *self + .disks + .read() + .unwrap() + .get(disk) + .unwrap() .size .lock() .unwrap() @@ -707,7 +744,7 @@ impl BlockAccess { } pub fn disk_path(&self, disk: DiskId) -> PathBuf { - self.disks.read().unwrap()[disk.index()].path.clone() + self.disks.read().unwrap().get(disk).unwrap().path.clone() } pub fn total_capacity(&self) -> u64 { @@ -721,7 +758,7 @@ impl BlockAccess { self.verify_aligned(extent.size); let disk = extent.location.disk(); - let fut = self.disks.read().unwrap()[disk.index()].read( + let fut = self.disks.read().unwrap().get(disk).unwrap().read( extent.location.offset(), usize::from64(extent.size), io_type, @@ -744,7 +781,13 @@ impl BlockAccess { self.verify_aligned(location.offset()); self.verify_aligned(bytes.len()); let disk = location.disk(); - let fut = self.disks.read().unwrap()[disk.index()].write(location.offset(), bytes, io_type); + let fut = + self.disks + .read() + .unwrap() + .get(disk) + .unwrap() + .write(location.offset(), bytes, io_type); // drop disks RwLock before waiting for io fut.await; } @@ -916,7 +959,7 @@ impl BlockAccess { .disks .read() .unwrap() - .iter() + .values() .map(|disk| disk.io_stats) .collect(), } diff --git a/cmd/zfs_object_agent/zettacache/src/block_allocator/mod.rs b/cmd/zfs_object_agent/zettacache/src/block_allocator/mod.rs index b19f310fd904..2df0048ba995 100644 --- a/cmd/zfs_object_agent/zettacache/src/block_allocator/mod.rs +++ b/cmd/zfs_object_agent/zettacache/src/block_allocator/mod.rs @@ -27,6 +27,7 @@ use util::with_alloctag; use util::writeln_stdout; use util::BitRange; use util::RangeTree; +use util::VecMap; use self::slabs::Slabs; use crate::base_types::*; @@ -110,12 +111,13 @@ trait SlabTrait { fn import_free(&mut self, extent: Extent); fn allocate(&mut self, size: u32) -> Option; fn free(&mut self, extent: Extent); - fn flush_to_spacemap(&mut self, spacemap: &mut SpaceMap); + fn flush_to_spacemap(&mut self, spacemap: &mut SpaceMap) -> (u64, u64); fn condense_to_spacemap(&self, spacemap: &mut SpaceMap); fn mark_slab_info(&self, id: SlabId, spacemap: &mut SpaceMap); fn max_size(&self) -> u32; fn capacity_bytes(&self) -> u64; fn free_space(&self) -> u64; + fn freeing_space(&self) -> u64; fn allocated_space(&self) -> u64; fn num_segments(&self) -> u64; fn allocated_extents(&self) -> Vec; @@ -246,21 +248,17 @@ impl SlabTrait for BitmapSlab { "double free at slot {:?}", slot ); - - if self.allocating.contains(slot) { - assert!(!self.freeing.contains(slot)); - self.allocating.remove(slot); - } else { - self.freeing.insert(slot); - } + self.freeing.insert(slot); } - fn flush_to_spacemap(&mut self, spacemap: &mut SpaceMap) { + fn flush_to_spacemap(&mut self, spacemap: &mut SpaceMap) -> (u64, u64) { // It could happen that a segment was allocated and then freed within the same checkpoint // period at which point it would be part of both `allocating` and `freeing` sets. For // this reason we always record `allocating` first, before `freeing`, on our spacemaps. // Note that segments cannot be freed and then allocated within the same checkpoint // period. + + let allocated_bytes = u64::from(self.allocatable.len()) * u64::from(self.slot_size); for (slot, run) in self.allocating.iter_ranges() { spacemap.alloc(Extent { location: self.slot_to_location(slot), @@ -269,19 +267,20 @@ impl SlabTrait for BitmapSlab { } self.allocating.clear(); + // Space freed during this checkpoint is now available for reallocation. + let freed_bytes = u64::from(self.freeing.len()) * u64::from(self.slot_size); for (slot, run) in self.freeing.iter_ranges() { spacemap.free(Extent { location: self.slot_to_location(slot), size: u64::from(run) * u64::from(self.slot_size), }); - } - // Space freed during this checkpoint is now available for reallocation. - for (slot, run) in self.freeing.iter_ranges() { with_alloctag(Self::ALLOCATABLE_TAG, || { self.allocatable.insert_range(slot..(slot + run)) }); } self.freeing.clear(); + + (allocated_bytes, freed_bytes) } fn condense_to_spacemap(&self, spacemap: &mut SpaceMap) { @@ -324,8 +323,13 @@ impl SlabTrait for BitmapSlab { u64::from(self.allocatable.len()) * u64::from(self.slot_size) } + fn freeing_space(&self) -> u64 { + u64::from(self.freeing.len()) * u64::from(self.slot_size) + } + fn allocated_space(&self) -> u64 { - u64::from(self.total_slots - self.allocatable.len()) * u64::from(self.slot_size) + u64::from(self.total_slots - self.allocatable.len() - self.freeing.len()) + * u64::from(self.slot_size) } fn mark_slab_info(&self, id: SlabId, spacemap: &mut SpaceMap) { @@ -502,20 +506,10 @@ impl SlabTrait for ExtentSlab { let size = extent.size; self.allocatable.verify_absent(offset, size); - - match self.allocating.overlap(offset, size) { - Some(_) => { - self.freeing.verify_absent(offset, size); - self.allocating.remove(offset, size); - } - None => { - self.allocating.verify_absent(offset, size); - self.freeing.add(offset, size); - } - } + self.freeing.add(offset, size); } - fn flush_to_spacemap(&mut self, spacemap: &mut SpaceMap) { + fn flush_to_spacemap(&mut self, spacemap: &mut SpaceMap) -> (u64, u64) { self.freeing.verify_space(); self.allocating.verify_space(); self.allocatable.verify_space(); @@ -527,6 +521,7 @@ impl SlabTrait for ExtentSlab { // this reason we always record `allocating` first, before `freeing`, on our spacemaps. // Note that segments cannot be freed and then allocated within the same checkpoint // period. + let allocated_bytes = self.allocating.space(); for (&start, &size) in self.allocating.iter() { self.allocatable.verify_absent(start, size); spacemap.alloc(Extent::new(disk, start, size)); @@ -534,12 +529,15 @@ impl SlabTrait for ExtentSlab { self.allocating.clear(); // Space freed during this checkpoint is now available for reallocation. + let freed_bytes = self.freeing.space(); for (&start, &size) in self.freeing.iter() { self.allocating.verify_absent(start, size); spacemap.free(Extent::new(disk, start, size)); with_alloctag(Self::ALLOCATABLE_TAG, || self.allocatable.add(start, size)); } self.freeing.clear(); + + (allocated_bytes, freed_bytes) } fn condense_to_spacemap(&self, spacemap: &mut SpaceMap) { @@ -574,8 +572,12 @@ impl SlabTrait for ExtentSlab { self.allocatable.space() } + fn freeing_space(&self) -> u64 { + self.freeing.space() + } + fn allocated_space(&self) -> u64 { - self.total_space - self.free_space() + self.total_space - self.free_space() - self.freeing_space() } fn mark_slab_info(&self, id: SlabId, spacemap: &mut SpaceMap) { @@ -676,8 +678,8 @@ impl SlabTrait for EvacuatingSlab { ); } - fn flush_to_spacemap(&mut self, _: &mut SpaceMap) { - panic!("attempting to flush evacuating slab",); + fn flush_to_spacemap(&mut self, _: &mut SpaceMap) -> (u64, u64) { + panic!("attempting to flush evacuating slab"); } fn condense_to_spacemap(&self, _: &mut SpaceMap) { @@ -696,6 +698,10 @@ impl SlabTrait for EvacuatingSlab { 0 } + fn freeing_space(&self) -> u64 { + 0 + } + fn allocated_space(&self) -> u64 { 0 } @@ -784,7 +790,8 @@ impl Slab { self.inner.as_dyn().mark_slab_info(self.id, spacemap); } - fn flush_to_spacemap(&mut self, spacemap: &mut SpaceMap) { + // Returns (bytes_allocated, bytes_freed) + fn flush_to_spacemap(&mut self, spacemap: &mut SpaceMap) -> (u64, u64) { self.is_dirty = false; self.is_allocd = false; self.inner.as_mut_dyn().flush_to_spacemap(spacemap) @@ -805,6 +812,10 @@ impl Slab { self.inner.as_dyn().free_space() } + fn freeing_space(&self) -> u64 { + self.inner.as_dyn().freeing_space() + } + fn allocated_space(&self) -> u64 { self.inner.as_dyn().allocated_space() } @@ -1009,6 +1020,7 @@ impl BlockAllocatorBuilder { let phys = self.phys; let slabs = self.slabs; let block_access = self.block_access; + let removing_disks = slab_allocator.removing_disks().collect::>(); let next_slab_to_condense = phys.next_slab_to_condense; @@ -1019,22 +1031,33 @@ impl BlockAllocatorBuilder { phys.spacemap_next, ); - let mut available_space = 0u64; let mut evacuating_slabs = Vec::new(); + let mut noalloc_state = VecMap::<_, HashSet<_>>::default(); + let mut per_disk_stats = block_access + .disks() + .map(|disk| (disk, DiskStats::default())) + .collect::>(); let mut slabs_by_bucket: BTreeMap> = BTreeMap::new(); for slab in slabs.iter() { - available_space += slab.free_space(); + let slab_disk = slab.location().disk(); - match &slab.inner { - SlabEnum::BitmapBased(_) | SlabEnum::ExtentBased(_) => { - slabs_by_bucket - .entry(SlabBucketSize(slab.max_size())) - .or_default() - .push(slab.to_slab_bucket_entry()); - } - SlabEnum::Evacuating(_) => { - evacuating_slabs.push(slab.id); - } + let disk_stats = per_disk_stats.get_mut(slab_disk).unwrap(); + disk_stats.free_bytes += slab.free_space(); + disk_stats.alloc_bytes += slab.allocated_space(); + + if matches!(slab.inner, SlabEnum::Evacuating(_)) { + evacuating_slabs.push(slab.id); + } + + if let Some(&disk) = removing_disks.get(&slab_disk) { + noalloc_state.get_mut_or_default(disk).insert(slab.id); + disk_stats.noalloc_bytes += slab.free_space(); + } else { + assert_eq!(disk_stats.noalloc_bytes, 0); + slabs_by_bucket + .entry(SlabBucketSize(slab.max_size())) + .or_default() + .push(slab.to_slab_bucket_entry()); } } @@ -1049,15 +1072,26 @@ impl BlockAllocatorBuilder { dirty_slabs: Default::default(), slab_allocator, evacuating_slabs, + noalloc_state, slab_buckets, - available_space, - freeing_space: 0, + per_disk_stats, checkpoint_allocated_bytes: 0, block_access, } } } +#[derive(Default)] +struct DiskStats { + // The amount of allocated space in the slabs held by the block_allocator + alloc_bytes: u64, + // The amount of free space in the slabs held by the block_allocator (includes noalloc_bytes) + free_bytes: u64, + // The number of bytes that are free in the noalloc_slabs but is not available for + // allocations + noalloc_bytes: u64, +} + pub struct BlockAllocator { slab_size: u32, @@ -1106,10 +1140,34 @@ pub struct BlockAllocator { slab_allocator: Arc, evacuating_slabs: Vec, + // This field contains all the data related to the removal logic. Each disk ID that is used + // as a key in this map is a device marked for removal. The value is the set of slabs that + // are part of that disk (which are managed by the BlockAllocator). These slabs are not in + // the allocatable buckets, so they are not allocatable. The removal logic uses this field + // as follows: + // + // [1] When we mark a device for removal in the block allocator (see `mark_noalloc_disk()`) + // we move any slabs that belong to that disk, away from the allocation buckets, into a new + // disk entry in `noalloc_state` as a set (effectively marking the slabs as unavailable for + // future allocations). + // [2] Later, when there is enough free space in the cache for the allocated_space from those + // noalloc slabs, the merge code will submit a remap to move that allocated space to slabs of + // non-removing disks. + // [3] When the remap code empties all the noalloc slabs of the removing disk and gives them + // back to the slab_allocator then we no longer need that disk ID key and we remove it from + // our map. + // + // For details about cancellation see `unmark_noalloc_disk()`. + // + // This field is essentially a marker. Its keys are disk IDs of devices that are being + // removed AND have allocated space in the block allocator. So if a device marked for removal + // has no slabs currently in-use from the block allocator, it will not be in the map. + noalloc_state: VecMap>, + slab_buckets: SlabAllocationBuckets, - available_space: u64, - freeing_space: u64, + // Space statistics per disk + per_disk_stats: VecMap, // used only by incoming rate heuristic for condensing checkpoint_allocated_bytes: u64, @@ -1150,15 +1208,17 @@ impl BlockAllocator { bucket.insert(new_slab.to_slab_bucket_entry()); let extent = new_slab.allocate(request_size).unwrap(); + + self.stats_add_new_slab(&new_slab); + self.stats_track_allocation(extent); + let old = self.slabs.insert(new_id, new_slab); assert!(old.is_none()); - self.available_space += self.slab_allocator.slab_size(); + self.mark_slab_info(new_id); self.dirty_slab_id(new_id); trace!("{new_id:?} added to {bucket_size:?}"); - self.available_space -= extent.size; - self.checkpoint_allocated_bytes += extent.size; Some(extent) } @@ -1209,8 +1269,7 @@ impl BlockAllocator { extent ); self.dirty_slab_id(id); - self.available_space -= extent.size; - self.checkpoint_allocated_bytes += extent.size; + self.stats_track_allocation(extent); return Some(extent); } None => { @@ -1252,10 +1311,127 @@ impl BlockAllocator { let slab_id = self.slab_allocator.extent_to_slab_id(extent); self.slabs.get_mut(slab_id).free(extent); - self.freeing_space += extent.size; + + self.stats_track_free(extent); self.dirty_slab_id(slab_id); } + pub fn add_disk(&mut self, disk: DiskId) { + let inserted = self.per_disk_stats.insert(disk, Default::default()); + assert!(inserted.is_none()); + } + + pub fn remove_disk(&mut self, disk: DiskId) { + self.stats_verify_disk_is_empty(disk); + let removed = self.per_disk_stats.remove(disk); + assert!(removed.is_some()); + assert!(self.noalloc_state.get(disk).is_none()); + } + + /// Remove all slabs that belong to `disk` from our sorted slab buckets, + /// effectively forbidding any future allocations from them. + pub fn mark_noalloc_disk(&mut self, disk: DiskId) { + let disk_stats = self.per_disk_stats.get_mut(disk).unwrap(); + for bucket in self.slab_buckets.0.values_mut() { + bucket.by_freeness.retain(|entry| { + let slab = self.slabs.get(entry.slab_id); + if slab.location().disk() != disk { + true + } else { + self.noalloc_state + .get_mut_or_default(disk) + .insert(entry.slab_id); + + disk_stats.noalloc_bytes += slab.free_space(); + false + } + }); + + // Even if we removed all the disk's slabs from the allocation bucket the bucket can + // still point to one of those slabs if we allocated from it recently. + if let Some(bucket_current) = bucket.last_allocated { + if let Some(noalloc_slabs) = self.noalloc_state.get(disk) { + if noalloc_slabs.contains(&bucket_current.slab_id) { + bucket.advance(); + } + } + } + } + + // Make sure we also track any slabs from that disk that have already been submitted for + // evacuation. + for &slab_id in self.evacuating_slabs.iter() { + if self.slabs.get(slab_id).location().disk() == disk { + self.noalloc_state.get_mut_or_default(disk).insert(slab_id); + } + } + + // If the supplied `disk` had any free data the space of that data should equal the + // amount of space marked as non-allocatable. + assert_eq!(disk_stats.free_bytes, disk_stats.noalloc_bytes); + } + + /// Place any slabs marked as non-allocatable back to the sorted slab + /// buckets allowing us to allocate from them again. + pub fn unmark_noalloc_disk(&mut self, disk: DiskId) { + // Remaping the disk's data may have already finished, at which point there is nothing + // for us to do here. + if !self.disk_is_pending_remap(disk) { + self.stats_verify_disk_is_empty(disk); + return; + } + + let disk_stats = self.per_disk_stats.get_mut(disk).unwrap(); + self.noalloc_state + .get_mut(disk) + .unwrap() + .retain(|&slab_id| { + let slab = self.slabs.get(slab_id); + disk_stats.noalloc_bytes -= slab.free_space(); + + // Note: Any noalloc slabs that are marked as evacuating (e.g. they are currently + // being remapped) are not part of the allocation buckets and their free space + // is not accounted in the block allocator. Given that plus the fact that they + // will be released back to the slab allocator once remap is done, we want to + // skip them. + if matches!(slab.inner, SlabEnum::Evacuating(_)) { + return true; + } + + let bucket_size = self + .slab_buckets + .get_bucket_size_for_allocation_size(slab.max_size()); + self.slab_buckets + .get_bucket_for_bucket_size(bucket_size) + .insert(slab.to_slab_bucket_entry()); + + false + }); + // At this point everything is either back to allocatable or is currently evacuating. + // Either way there noalloc_bytes should be zero. + assert_eq!(disk_stats.noalloc_bytes, 0); + + // If the remap for this disk was never submitted, it's set of noalloc slabs should be + // empty, in which case we can finish cleaning up its noalloc_state entry here. + if self.noalloc_state.get(disk).unwrap().is_empty() { + self.noalloc_state.remove(disk); + } + } + + fn noalloc_slabs_to_remap(&self, disk: DiskId) -> Vec { + match self.noalloc_state.get(disk) { + Some(slabs) => slabs.iter().copied().collect(), + None => Vec::new(), + } + } + + /// Returns true if the disk is marked for removal and it is waiting for a remap to empty its + /// slabs from the block allocator. Otherwise, the device is either non-removing or it is + /// removing but has no slabs in-use from the block allocator. + pub fn disk_is_pending_remap(&self, disk: DiskId) -> bool { + self.noalloc_state.contains_key(&disk) + } + // This function is the entry-point to starting the cache rebalancing process. This will select // which slab(s) need to be rebalanced, mark those slabs as undergoing evacuation, and // allocate new disk locations for the data currently stored on those slabs. The value @@ -1273,36 +1449,55 @@ impl BlockAllocator { // the rebalance process as finished. This allows the allocator to transition the slabs that // were undergoing evacuation to free slabs, such that the slabs can later be used for // allocation. - pub fn rebalance_init(&mut self) -> Option>> { + pub fn rebalance_init( + &mut self, + removing_disk: Option, + ) -> Option>> { // For now, ensure rebalance_fini() is called before this function can be called a second // time. assert!(self.evacuating_slabs.is_empty()); let begin = Instant::now(); - let slabs = self.slabs_to_rebalance(); + let slabs = match removing_disk { + Some(disk) => self.noalloc_slabs_to_remap(disk), + None => self.slabs_to_rebalance(), + }; if slabs.is_empty() { info!("cache rebalance is not needed"); return None; } - info!("initializing rebalance of {} slabs", slabs.len()); + info!( + "initializing rebalance of {} slabs {}", + slabs.len(), + if removing_disk.is_some() { + "(triggered by removal)" + } else { + "" + } + ); - // In order to ensure the allocations performed in rebalance_slab() (called below) are - // not satisfied by any of the slabs we're going to rebalance, we need to remove these - // slabs from the list of slabs available for allocation. Further, we must remove all - // slabs before we do any allocations, to ensure we don't move an extent multiple times; - // otherwise, data corruption could occur, as the data contained in the extents, can be - // moved by the caller in any order. - // - // For example, if we mark an extent as moving from disk location A to B, and then again - // from B to C, the final data contained at disk location C could be incorrect, if the - // caller does the move of B to C before the move of A to B. Since we do not enforce the - // order in which the caller will do the copies, we need to ensure this cannot happen, by - // never moving an extent more than once. - for &id in slabs.iter() { - trace!("prepping slab '{:?}' for rebalancing", id); - self.slab_buckets.remove_slab(self.slabs.get(id)); + if removing_disk.is_none() { + // In order to ensure the allocations performed in rebalance_slab() (called below) are + // not satisfied by any of the slabs we're going to rebalance, we need to remove these + // slabs from the list of slabs available for allocation. Further, we must remove all + // slabs before we do any allocations, to ensure we don't move an extent multiple times; + // otherwise, data corruption could occur, as the data contained in the extents, can be + // moved by the caller in any order. + // + // For example, if we mark an extent as moving from disk location A to B, and then again + // from B to C, the final data contained at disk location C could be incorrect, if the + // caller does the move of B to C before the move of A to B. Since we do not enforce the + // order in which the caller will do the copies, we need to ensure this cannot happen, + // by never moving an extent more than once. + // + // Note that for removal we've already removed the removing disk's slabs from allocation + // buckets so there should be nothing to remove. + for &id in slabs.iter() { + trace!("prepping slab '{:?}' for rebalancing", id); + self.slab_buckets.remove_slab(self.slabs.get(id)); + } } let mut merged = 0; @@ -1325,6 +1520,10 @@ impl BlockAllocator { merged, ); + if let Some(disk) = removing_disk { + self.stats_verify_disk_is_empty(disk); + } + assert!(!self.evacuating_slabs.is_empty()); Some(map) } @@ -1362,7 +1561,10 @@ impl BlockAllocator { .slabs .iter() .filter(|&slab| match slab.inner { - SlabEnum::BitmapBased(_) | SlabEnum::ExtentBased(_) => true, + SlabEnum::BitmapBased(_) | SlabEnum::ExtentBased(_) => { + // Skip over slabs from disks being removed + !self.noalloc_state.contains_key(&slab.location().disk()) + } SlabEnum::Evacuating(_) => false, }) .map(|slab| slab.to_slab_bucket_entry()) @@ -1492,8 +1694,12 @@ impl BlockAllocator { // Since evacuating slabs don't have any allocatable space, we must account for that // here; we must do this before we transition to an evacuating slab (evacuating slabs // have no free space). - let slab = self.slabs.get(id); - self.available_space -= slab.free_space(); + self.stats_track_slab_evacuation(id); + + // XXX: Currently it is not possible to call `BlockAllocator.free()` from the + // checkpoint_task in-between the point that we flush a checkpoint and the point that we + // start a rebalance_init(). Thet is expected to change once DLPX-80824 lands. + assert_eq!(self.slabs.get(id).freeing_space(), 0); trace!("marking slab '{:?}' as evacuating", id); @@ -1516,6 +1722,13 @@ impl BlockAllocator { // evacuating slabs cannot allocate() or free(); thus, they should never be dirty. assert!(!self.slabs.get(id).is_dirty); + // if slab is part of removing disk, remove it from the noalloc slabs. + let slab_disk = self.slabs.get(id).location().disk(); + if let Some(noalloc_set) = self.noalloc_state.get_mut(slab_disk) { + let removed = noalloc_set.remove(&id); + assert!(removed); + } + self.slab_allocator.free(id); self.slabs.remove(id); @@ -1528,6 +1741,11 @@ impl BlockAllocator { }; target_spacemap.mark_slab_info(id, SlabPhysType::Free); } + + // This could be the end of the remap of a removing disk. If that's the case the set of + // (noalloc) slabs for that disk should be empty, which means we can remove its entry + // from the noalloc_state. + self.noalloc_state.retain(|_, slabs| !slabs.is_empty()); } /// Return number of slabs to condense, based on the "spacemap badness" ratio. Note that the @@ -1624,7 +1842,9 @@ impl BlockAllocator { let begin = Instant::now(); let old_pending = self.spacemap.pending_len() + self.spacemap_next.pending_len(); let ndirty_slabs = self.dirty_slabs.len(); - let mut allocd_slabs: u64 = 0; + let mut allocd_slabs = 0u64; + let mut total_allocated = 0; + let mut total_freed = 0; for slab_id in mem::take(&mut self.dirty_slabs) { if !self.slabs.exists(slab_id) { // This can happen if the slab was evacuated. @@ -1650,16 +1870,20 @@ impl BlockAllocator { } else { &mut self.spacemap_next }; - slab.flush_to_spacemap(target_spacemap); + let (allocated, freed) = slab.flush_to_spacemap(target_spacemap); + + self.stats_flush_freeing(slab_id, freed); + total_allocated += allocated; + total_freed += freed; } debug!( - "flushed {} slabs ({} allocd), {} entries in {}ms", - ndirty_slabs, - allocd_slabs, + "flushed {} entries ({} allocd, {} freed) to {ndirty_slabs} ({allocd_slabs} allocated from) in {}ms", nice_number_count( (self.spacemap.pending_len() + self.spacemap_next.pending_len() - old_pending) as f64 ), + nice_p2size(total_allocated), + nice_p2size(total_freed), begin.elapsed().as_millis() ); } @@ -1709,9 +1933,6 @@ impl BlockAllocator { self.flush_dirty(); let (spacemap, spacemap_next) = self.flush_impl().await; self.resort_buckets(); - - self.available_space += self.freeing_space; - self.freeing_space = 0; self.checkpoint_allocated_bytes = 0; if completed_merge { @@ -1755,14 +1976,110 @@ impl BlockAllocator { phys } - /// Return the amount of space in unallocated blocks. This does not include space in empty - /// slabs. - pub fn available(&self) -> u64 { - self.available_space + /// Return the amount of allocated space in slabs that are marked for removal. + pub fn removing_bytes(&self) -> u64 { + self.noalloc_state + .keys() + .map(|disk| self.per_disk_stats.get(disk).unwrap().alloc_bytes) + .sum() + } + + pub fn disk_allocated_bytes(&self, disk: DiskId) -> u64 { + self.per_disk_stats.get(disk).unwrap().alloc_bytes + } + + /// Return the amount of space in unallocated blocks. This does not include + /// space in empty slabs. + pub fn free_bytes(&self) -> u64 { + self.per_disk_stats + .values() + .map(|stats| stats.free_bytes) + .sum::() + } + + /// Return the amount of space in unallocated blocks that's available for + /// allocations. This does not include space in empty slabs nor space in + /// slabs that are part of a device being removed. + pub fn allocatable_bytes(&self) -> u64 { + let free_bytes = self + .per_disk_stats + .values() + .map(|stats| stats.free_bytes) + .sum::(); + let noalloc_bytes = self + .per_disk_stats + .values() + .map(|stats| stats.noalloc_bytes) + .sum::(); + free_bytes - noalloc_bytes + } + + /// Incorporate this newly-created slab's free space into the BlockAllocator's `per_disk_stats`. + fn stats_add_new_slab(&mut self, slab: &Slab) { + self.per_disk_stats + .get_mut(slab.location().disk()) + .unwrap() + .free_bytes += slab.capacity_bytes(); + } + + /// Track the newly-allocated `extent` in the BlockAllocator's `per_disk_stats`. + fn stats_track_allocation(&mut self, extent: Extent) { + let disk_stats = self.per_disk_stats.get_mut(extent.location.disk()).unwrap(); + disk_stats.free_bytes -= extent.size; + disk_stats.alloc_bytes += extent.size; + assert_eq!( + disk_stats.noalloc_bytes, 0, + "can't allocate from non-allocatable disk" + ); + self.checkpoint_allocated_bytes += extent.size; + } + + /// Track the `extent` that was just freed in the BlockAllocator's `per_disk_stats`. + fn stats_track_free(&mut self, extent: Extent) { + let disk = extent.location.disk(); + + let disk_stats = self.per_disk_stats.get_mut(disk).unwrap(); + disk_stats.alloc_bytes -= extent.size; + if !self.noalloc_state.contains_key(&disk) { + assert_eq!(disk_stats.noalloc_bytes, 0); + } + } + + /// Update the `free_bytes` of the disk where `slab_id` belongs to with the amount that was + /// just `freed` when flushing that slab. If that slab is part of a removing disk then + /// `noalloc_bytes` from that disk is also incremented by `freed`. + fn stats_flush_freeing(&mut self, slab_id: SlabId, freed: u64) { + let disk = self.slabs.get(slab_id).location().disk(); + + let disk_stats = self.per_disk_stats.get_mut(disk).unwrap(); + disk_stats.free_bytes += freed; + if self.noalloc_state.contains_key(&disk) { + disk_stats.noalloc_bytes += freed; + } + } + + /// Remove any space accounted for the supplied slab from `per_disk_stats` as we are + /// preparing it for evacuation. + fn stats_track_slab_evacuation(&mut self, slab_id: SlabId) { + let slab = self.slabs.get(slab_id); + let slab_disk = slab.location().disk(); + + let disk_stats = self.per_disk_stats.get_mut(slab_disk).unwrap(); + disk_stats.free_bytes -= slab.free_space(); + disk_stats.alloc_bytes -= slab.allocated_space(); + if self.noalloc_state.contains_key(&slab_disk) { + disk_stats.noalloc_bytes -= slab.free_space(); + } else { + assert_eq!(disk_stats.noalloc_bytes, 0); + } } - pub fn freeing(&self) -> u64 { - self.freeing_space + /// Verify that the supplied `disk` doesn't contribute any space in `per_disk_stats`. + fn stats_verify_disk_is_empty(&self, disk: DiskId) { + let disk_stats = self.per_disk_stats.get(disk).unwrap(); + assert_eq!(disk_stats.alloc_bytes, 0); + assert_eq!(disk_stats.free_bytes, 0); + assert_eq!(disk_stats.noalloc_bytes, 0); } fn mark_slab_info(&mut self, id: SlabId) { @@ -1774,6 +2091,13 @@ impl BlockAllocator { }; slab.mark_slab_info(target_spacemap); } + + // Returns the number of slabs moved. + pub async fn transfer_metadata_for_removal(&mut self, disk: DiskId) -> u64 { + let mut moved = self.spacemap.transfer_data_for_removal(disk).await; + moved += self.spacemap_next.transfer_data_for_removal(disk).await; + moved + } } #[derive(Debug, Serialize, Deserialize, Clone, Copy)] diff --git a/cmd/zfs_object_agent/zettacache/src/block_based_log/mod.rs b/cmd/zfs_object_agent/zettacache/src/block_based_log/mod.rs index 25dd774e666d..b06f3789a565 100644 --- a/cmd/zfs_object_agent/zettacache/src/block_based_log/mod.rs +++ b/cmd/zfs_object_agent/zettacache/src/block_based_log/mod.rs @@ -126,6 +126,42 @@ impl BlockBasedLogPhys { } } + /// Find slabs used in this log that are located in the `removing_disk` and move their data + /// to newly allocated slabs from non-removing disks. Returns the number of slabs moved. + pub async fn transfer_data_for_removal( + &mut self, + removing_disk: DiskId, + slab_allocator: &SlabAllocator, + block_access: &BlockAccess, + ) -> u64 { + let mut moved = 0u64; + for slab in &mut self.slabs { + let slab_extent = slab_allocator.slab_id_to_extent(*slab); + if slab_extent.location.disk() != removing_disk { + continue; + } + + let new_slab = slab_allocator.allocate_reserved(); + let new_slab_extent = slab_allocator.slab_id_to_extent(new_slab); + assert!(new_slab_extent.location.disk() != removing_disk); + + let bytes = block_access + .read_raw(slab_extent, DiskIoType::MaintenanceRead) + .await; + block_access + .write_raw( + new_slab_extent.location, + bytes, + DiskIoType::MaintenanceWrite, + ) + .await; + slab_allocator.free(*slab); + *slab = new_slab; + moved += 1; + } + moved + } + fn written_extents<'a>( &'a self, slab_access: &'a SlabAccess, @@ -327,6 +363,13 @@ impl BlockBasedLog { self.phys.trimmed = offset; } + // Returns the number of slabs moved. + pub async fn transfer_data_for_removal(&mut self, removing_disk: DiskId) -> u64 { + self.phys + .transfer_data_for_removal(removing_disk, &self.slab_allocator, &self.block_access) + .await + } + async fn flush_impl(&mut self, mut new_chunk_fn: F) where F: FnMut(ChunkId, LogOffset, T), diff --git a/cmd/zfs_object_agent/zettacache/src/checkpoint.rs b/cmd/zfs_object_agent/zettacache/src/checkpoint.rs index e51714d2618f..a3d1bd94c583 100644 --- a/cmd/zfs_object_agent/zettacache/src/checkpoint.rs +++ b/cmd/zfs_object_agent/zettacache/src/checkpoint.rs @@ -25,6 +25,7 @@ use crate::slab_allocator::SlabAllocator; use crate::slab_allocator::SlabAllocatorBuilder; use crate::slab_allocator::SlabAllocatorPhys; use crate::zettacache::merge::MergeProgressPhys; +use crate::zettacache::removal::DeviceRemovalPhys; use crate::zettacache::OperationLogEntry; #[derive(Serialize, Deserialize, Default, Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd)] @@ -46,6 +47,8 @@ pub struct CheckpointPhys { pub operation_log: BlockBasedLogPhys, pub size_histogram: SizeHistogramPhys, pub merge_progress: Option, + #[serde(default)] + pub device_removal: DeviceRemovalPhys, } impl CheckpointPhys { diff --git a/cmd/zfs_object_agent/zettacache/src/features.rs b/cmd/zfs_object_agent/zettacache/src/features.rs index 362140708411..8b7e7670a518 100644 --- a/cmd/zfs_object_agent/zettacache/src/features.rs +++ b/cmd/zfs_object_agent/zettacache/src/features.rs @@ -29,6 +29,7 @@ lazy_static! { SLAB_SIZE_32MB.clone(), TRIMMED_INDEX.clone(), DISK_GUIDS.clone(), + CACHE_DEVICE_REMOVAL.clone(), ] .map(|feature: Feature| (feature.name, feature.info)) .into_iter() @@ -53,6 +54,10 @@ lazy_static! { name: FeatureName("com.delphix:disk_guids".to_string()), info: FeatureType::Upgradeable }; + pub static ref CACHE_DEVICE_REMOVAL: Feature = Feature { + name: FeatureName("com.delphix:cache_device_removal".to_string()), + info: FeatureType::Upgradeable + }; } #[derive(Debug)] diff --git a/cmd/zfs_object_agent/zettacache/src/open.rs b/cmd/zfs_object_agent/zettacache/src/open.rs index c6ab7cb87237..d738d28960d7 100644 --- a/cmd/zfs_object_agent/zettacache/src/open.rs +++ b/cmd/zfs_object_agent/zettacache/src/open.rs @@ -30,9 +30,15 @@ pub enum CacheOpenMode { } impl CacheOpenMode { - pub async fn device_paths(self) -> Result> { + pub async fn device_paths(self) -> Result> { Ok(match self { - CacheOpenMode::DeviceList(paths) => paths, + CacheOpenMode::DeviceList(paths) => { + let mut disk_map = BTreeMap::new(); + for (id, path) in paths.into_iter().enumerate() { + disk_map.insert(DiskId::new(id), path); + } + disk_map + } CacheOpenMode::DiscoveryDirectory(dir, target_guid) => { discover_devices(&dir, target_guid).await? } @@ -75,7 +81,10 @@ impl DiscoveredDevice { } } -async fn discover_devices(dir_path: &Path, target_guid: Option) -> Result> { +async fn discover_devices( + dir_path: &Path, + target_guid: Option, +) -> Result> { let mut caches = HashMap::<_, BTreeMap<_, _>>::new(); let mut discovery = FuturesUnordered::new(); @@ -143,19 +152,21 @@ async fn discover_devices(dir_path: &Path, target_guid: Option) -> Re caches.retain(|cache_guid, _| *cache_guid == guid); } - match caches.values().next() { - Some(cache) => { - if caches.len() > 1 { - Err(anyhow!( - "multiple valid caches found in {dir_path:?}: {:?}", - caches.keys().collect::>(), - )) - } else { - Ok(cache - .iter() - .map(|(_, dev)| dev.device_path.clone()) - .collect()) - } + if caches.len() > 1 { + return Err(anyhow!( + "multiple valid caches found in {dir_path:?}: {:?}", + caches.keys().collect::>(), + )); + } + + let cache_info = caches.drain().next(); + match cache_info { + Some((cache_guid, cache)) => { + info!("discovery: importing cache '{cache_guid:?}'"); + Ok(cache + .into_iter() + .map(|((disk_id, _), dev)| (disk_id, dev.device_path)) + .collect()) } None => Err(anyhow!("no valid caches found in {dir_path:?}")), } @@ -165,9 +176,10 @@ fn is_valid_cache( cache_guid: CacheGuid, disks: &mut BTreeMap<(DiskId, Option), DiscoveredDevice>, ) -> bool { + // We always pick the latest primary block (determined by checkpoint_id) let primary = disks .values() - .find(|&disk| disk.superblock.primary.is_some()); + .max_by_key(|d| d.superblock.primary.as_ref().map(|p| p.checkpoint_id)); match primary { Some(disk) => { diff --git a/cmd/zfs_object_agent/zettacache/src/slab_allocator.rs b/cmd/zfs_object_agent/zettacache/src/slab_allocator.rs index 0fadd7663cdf..b895720cf95c 100644 --- a/cmd/zfs_object_agent/zettacache/src/slab_allocator.rs +++ b/cmd/zfs_object_agent/zettacache/src/slab_allocator.rs @@ -1,4 +1,4 @@ -//! The disk is divided into equal-size slabs (by default, 16MB each). The SlabAllocator knows +//! The disk is divided into equal-size slabs (by default, 32MB each). The SlabAllocator knows //! which slabs are allocated and which are free. Other subsystems (e.g. BlockBasedLogs, the //! BlockAllocator, and Checkpoints) can call into the SlabAllocator to allocate slabs for their //! own use. @@ -9,8 +9,11 @@ //! Allocator which slabs are allocated, by calling SlabAllocatorBuilder::claim(). use std::cmp::max; +use std::collections::BTreeMap; use std::collections::HashSet; +use std::mem; use std::ops::Add; +use std::ops::AddAssign; use std::ops::Bound::*; use std::ops::Sub; use std::sync::Mutex; @@ -28,7 +31,9 @@ use util::measure; use util::tunable; use util::tunable::Percent; use util::From64; +use util::VecMap; +use crate::base_types::DiskId; use crate::base_types::Extent; tunable! { @@ -51,10 +56,25 @@ tunable! { static ref TARGET_AVAILABLE_SLABS_PCT: Percent = Percent::new(2.0); } +// Note: Before device removal we used `Extent`s to indicate a present chunk of +// capacity. The `untagged` and `flatten` serde annotations below are used to +// make new bits backwards compatible with the old on-disk format. +#[derive(Debug, Serialize, Deserialize, Copy, Clone, Eq, Ord, PartialEq, PartialOrd)] +#[serde(untagged)] +enum SlabExtentPhys { + Present { + #[serde(flatten)] + extent: Extent, + }, + Removed { + extent: Extent, + }, +} + #[derive(Debug, Serialize, Deserialize, Clone)] pub struct SlabAllocatorPhys { slab_size: u64, - capacity: Vec, + capacity: Vec, } #[derive(Debug)] @@ -65,8 +85,10 @@ pub struct SlabAccess { #[derive(Debug)] struct SlabAccessInner { - capacity: BiBTreeMap, + capacity: BTreeMap, + active_capacity: BiBTreeMap, num_slabs: u64, + next_slab_id: SlabId, } #[derive(Debug)] @@ -85,9 +107,29 @@ pub struct SlabAllocator { struct Inner { allocatable: Vec, freeing: Vec, + + // If there is an entry in the map below, then the entire disk has been marked as noalloc, + // and none of its slabs will appear in `allocatable`. When the disk is finally removed, its + // key is removed from this map. Note that the slab IDs that belong to a removed disk are + // never re-used for the lifetime of the cache. + noalloc_state: VecMap>, + // Slabs that are released/freed back to the SlabAllocator and belong to a removing disk, end + // up here instead of `freeing`, and eventually make it back to the noalloc_state for that + // disk. + noallocing: Vec, + reserved_slabs: u64, } +impl Inner { + fn num_removing_slabs(&self, access: &SlabAccess) -> u64 { + self.noalloc_state + .keys() + .map(|disk| access.disk_num_slabs(disk)) + .sum::() + } +} + impl SlabAllocatorPhys { pub fn new>(capacity: I) -> Self { let mut this = Self { @@ -105,13 +147,20 @@ impl SlabAllocatorPhys { { for extent in capacity { // capacity is aligned to be a multiple of slabsize - self.capacity - .push(extent.range(0, extent.size - extent.size % self.slab_size)); + self.capacity.push(SlabExtentPhys::Present { + extent: extent.range(0, extent.size - extent.size % self.slab_size), + }); } } - pub fn capacity(&self) -> &[Extent] { - &self.capacity + pub fn active_capacity_bytes(&self) -> u64 { + self.capacity + .iter() + .map(|extent_phys| match extent_phys { + SlabExtentPhys::Present { extent } => extent.size, + SlabExtentPhys::Removed { extent: _ } => 0, + }) + .sum() } #[allow(dead_code)] @@ -124,7 +173,7 @@ impl SlabAccess { pub fn slab_id_to_extent(&self, slab_id: SlabId) -> Extent { let inner = self.inner.read().unwrap(); let (&extent_slab, containing_extent) = inner - .capacity + .active_capacity .left_range((Unbounded, Included(slab_id))) .next_back() .unwrap(); @@ -136,7 +185,7 @@ impl SlabAccess { let inner = self.inner.read().unwrap(); let (&capacity_slab, capacity_extent) = inner - .capacity + .active_capacity .right_range((Unbounded, Included(extent.location))) .next_back() .unwrap(); @@ -145,7 +194,6 @@ impl SlabAccess { let slab_id = capacity_slab + ((extent.location - capacity_extent.location) / self.slab_size); - assert_lt!(slab_id.0, inner.num_slabs); debug_assert!(self.slab_id_to_extent(slab_id).contains(&extent)); slab_id } @@ -163,26 +211,63 @@ impl SlabAccess { let inner = self.inner.read().unwrap(); inner.num_slabs * self.slab_size } + + pub fn disk_capacity(&self, disk: DiskId) -> u64 { + let inner = self.inner.read().unwrap(); + inner + .active_capacity + .iter() + .filter(|(_, &extent)| extent.location.disk() == disk) + .map(|(_, &extent)| extent.size) + .sum() + } + + pub fn disk_num_slabs(&self, disk: DiskId) -> u64 { + self.disk_capacity(disk) / self.slab_size() + } } impl SlabAllocatorBuilder { pub fn new(phys: SlabAllocatorPhys) -> Self { - let mut num_slabs = 0; + let mut allocatable = HashSet::new(); + let mut active_capacity = BiBTreeMap::new(); + + let mut total_slabs = 0; + let mut removed_slabs = 0; let capacity = phys .capacity .into_iter() - .map(|extent| { - let start = SlabId(num_slabs); - num_slabs += extent.size / phys.slab_size; - (start, extent) + .map(|extent_phys| { + let start = SlabId(total_slabs); + match extent_phys { + SlabExtentPhys::Present { extent } => { + active_capacity.insert(start, extent); + let nslabs = extent.size / phys.slab_size; + allocatable.extend( + (total_slabs..(total_slabs + nslabs)) + .map(SlabId) + .collect::>(), + ); + total_slabs += nslabs; + } + SlabExtentPhys::Removed { extent } => { + let nslabs = extent.size / phys.slab_size; + removed_slabs += nslabs; + total_slabs += nslabs; + } + } + (start, extent_phys) }) .collect(); + Self { - allocatable: (0..num_slabs).map(SlabId).collect(), + allocatable, access: SlabAccess { inner: RwLock::new(SlabAccessInner { capacity, - num_slabs, + active_capacity, + num_slabs: total_slabs - removed_slabs, + next_slab_id: SlabId(total_slabs), }), slab_size: phys.slab_size, }, @@ -194,11 +279,28 @@ impl SlabAllocatorBuilder { assert!(removed, "{slab_id:?} claimed twice"); } - pub fn build(self) -> SlabAllocator { + pub fn build(self, removing_disks: &[DiskId]) -> SlabAllocator { + let mut noalloc_slabs = removing_disks + .iter() + .copied() + .map(|disk| (disk, Vec::new())) + .collect::>(); + let mut allocatable = Vec::new(); + for slab_id in self.allocatable.into_iter() { + let slab_disk = self.access.slab_id_to_extent(slab_id).location.disk(); + + noalloc_slabs + .get_mut(slab_disk) + .unwrap_or(&mut allocatable) + .push(slab_id); + } + SlabAllocator { inner: Mutex::new(Inner { - allocatable: self.allocatable.into_iter().collect(), + allocatable, freeing: Vec::new(), + noalloc_state: noalloc_slabs, + noallocing: Vec::new(), reserved_slabs: RESERVED_SLABS_PCT.apply(self.access.num_slabs()), }), access: self.access, @@ -240,10 +342,15 @@ impl SlabAllocator { let mut inner = self.inner.lock().unwrap(); let mut access_inner = self.access.inner.write().unwrap(); - let first_new_slab = SlabId(access_inner.num_slabs); + let first_new_slab = access_inner.next_slab_id; // capacity is aligned to be a multiple of slabsize let capacity = capacity.trim_end(capacity.size - capacity.size % self.access.slab_size); - access_inner.capacity.insert(first_new_slab, capacity); + access_inner + .capacity + .insert(first_new_slab, SlabExtentPhys::Present { extent: capacity }); + access_inner + .active_capacity + .insert(first_new_slab, capacity); let new_slabs = capacity.size / self.access.slab_size; // We don't want to allocate and write to the new capacity until the next checkpoint @@ -251,19 +358,100 @@ impl SlabAllocator { // we add the new slabs to `freeing`. inner .freeing - .extend((0..new_slabs).map(|n| SlabId(access_inner.num_slabs + n))); + .extend((0..new_slabs).map(|i| access_inner.next_slab_id + i)); access_inner.num_slabs += new_slabs; + access_inner.next_slab_id += new_slabs; + } + + pub fn remove_disk(&self, disk: DiskId) { + assert!(self.disk_is_fully_evacuated(disk)); + + let mut inner = self.inner.lock().unwrap(); + let mut access_inner_guard = self.access.inner.write().unwrap(); + let access_inner = &mut *access_inner_guard; + let removed_slabs = inner.noalloc_state.remove(disk).unwrap(); + + access_inner.active_capacity.retain(|&slab_id, &extent| { + if extent.location.disk() == disk { + let old = access_inner + .capacity + .insert(slab_id, SlabExtentPhys::Removed { extent }); + assert_eq!(old, Some(SlabExtentPhys::Present { extent })); + trace!( + "removal: removing {:?} from slab allocator capacity", + old.unwrap() + ); + false + } else { + true + } + }); + access_inner.num_slabs -= removed_slabs.len() as u64; + + // Dropping guard as writer so the assertions below can grab guard as reader in + // slab_id_to_extent(). + drop(access_inner_guard); + assert!(!inner.allocatable.iter().any(|&slab_id| self + .slab_id_to_extent(slab_id) + .location + .disk() + == disk)); + assert!(!inner.freeing.iter().any(|&slab_id| self + .slab_id_to_extent(slab_id) + .location + .disk() + == disk)); + assert!(!inner.noallocing.iter().any(|&slab_id| self + .slab_id_to_extent(slab_id) + .location + .disk() + == disk)); + } + + /// Marks all slabs that are part of `disk` as non-allocatable. No further allocations can be + /// made from these slabs and they won't be reused once freed. + pub fn mark_noalloc_disk(&self, disk: DiskId) { + let mut guard = self.inner.lock().unwrap(); + let inner = &mut *guard; + let mut removing_allocatable = Vec::new(); + inner.allocatable.retain(|&slab_id| { + if self.slab_id_to_extent(slab_id).location.disk() == disk { + removing_allocatable.push(slab_id); + false + } else { + true + } + }); + inner.freeing.retain(|&slab_id| { + if self.slab_id_to_extent(slab_id).location.disk() == disk { + inner.noallocing.push(slab_id); + false + } else { + true + } + }); + let replaced = inner.noalloc_state.insert(disk, removing_allocatable); + assert!(replaced.is_none()); + } + + /// Marks all free slabs that are part of `disk` as allocatable. This also enables + /// the reuse of allocated slabs that are freed. + pub fn unmark_noalloc_disk(&self, disk: DiskId) { + let mut guard = self.inner.lock().unwrap(); + let inner = &mut *guard; + inner + .allocatable + .append(&mut inner.noalloc_state.remove(disk).unwrap()); + inner.freeing.append(&mut inner.noallocing); + inner.allocatable.shuffle(&mut thread_rng()); } pub fn get_phys(&self) -> SlabAllocatorPhys { + let access_inner = self.access.inner.read().unwrap(); SlabAllocatorPhys { slab_size: self.access.slab_size, - capacity: self - .access - .inner - .read() - .unwrap() + capacity: access_inner .capacity .iter() .map(|(_, &extent)| extent) @@ -291,15 +479,18 @@ impl SlabAllocator { /// regardless of how much metadata is actually used. pub fn set_reservation(&self, reserved_space: u64) { let mut inner = self.inner.lock().unwrap(); + let removing_slabs = inner.num_removing_slabs(&self.access); inner.reserved_slabs = max( reserved_space / self.access.slab_size, - RESERVED_SLABS_PCT.apply(self.access.num_slabs()), + RESERVED_SLABS_PCT.apply(self.access.num_slabs() - removing_slabs), ); } pub fn allocate_reserved(&self) -> SlabId { let mut inner = self.inner.lock().unwrap(); - if inner.allocatable.len() as u64 <= SUPER_RESERVED_SLABS_PCT.apply(self.access.num_slabs()) + let removing_slabs = inner.num_removing_slabs(&self.access); + if inner.allocatable.len() as u64 + <= SUPER_RESERVED_SLABS_PCT.apply(self.access.num_slabs() - removing_slabs) { panic!("Free slabs exhausted."); } @@ -310,11 +501,20 @@ impl SlabAllocator { pub fn free(&self, slab: SlabId) { trace!("freeing {slab:?}"); - self.inner.lock().unwrap().freeing.push(slab); + let extent_disk = self.access().slab_id_to_extent(slab).location.disk(); + let mut inner = self.inner.lock().unwrap(); + if inner.noalloc_state.contains_key(&extent_disk) { + inner.noallocing.push(slab); + } else { + inner.freeing.push(slab); + } } /// Returns the amount of non-reserved available space, in bytes. i.e. the amount that could /// be allocated by allocate(). + /// + /// Note: Slabs that are part of a disk that's been `mark_noalloc_disk()`-ed are not + /// allocatable, and therefore not included in returned value. pub fn allocatable_bytes(&self) -> u64 { let inner = self.inner.lock().unwrap(); (inner.allocatable.len() as u64).saturating_sub(inner.reserved_slabs) @@ -322,17 +522,23 @@ impl SlabAllocator { } /// Returns the number of slabs that are not currently allocated. This includes reserved and - /// super-reserved slabs. + /// super-reserved slabs and the ones marked as noalloc for removal. pub fn free_slabs(&self) -> u64 { let inner = self.inner.lock().unwrap(); - inner.allocatable.len() as u64 + let noalloc_slabs = inner + .noalloc_state + .values() + .map(|slabs| slabs.len()) + .sum::() as u64; + inner.allocatable.len() as u64 + noalloc_slabs } /// Returns the number of slabs that we would like the block allocator to evacuate and free. pub fn num_slabs_to_evacuate(&self) -> u64 { let inner = self.inner.lock().unwrap(); - let target_free_slabs = - inner.reserved_slabs + TARGET_AVAILABLE_SLABS_PCT.apply(self.access.num_slabs()); + let removing_slabs = inner.num_removing_slabs(&self.access); + let target_free_slabs = inner.reserved_slabs + + TARGET_AVAILABLE_SLABS_PCT.apply(self.access.num_slabs() - removing_slabs); let current_free_slabs = inner.allocatable.len() + inner.freeing.len(); target_free_slabs.saturating_sub(current_free_slabs as u64) } @@ -346,6 +552,14 @@ impl SlabAllocator { // `inner.allocatable` and `inner.freeing` below as "split borrows", allowing two // exclusive references to different fields of the same struct. inner.allocatable.append(&mut inner.freeing); + for slab_id in mem::take(&mut inner.noallocing) { + let slab_disk = self.slab_id_to_extent(slab_id).location.disk(); + inner + .noalloc_state + .get_mut(slab_disk) + .unwrap() + .push(slab_id); + } inner.allocatable.shuffle(&mut thread_rng()); } @@ -366,12 +580,50 @@ impl SlabAllocator { } pub fn num_slabs(&self) -> u64 { - self.access.num_slabs() + let removing_slabs = self.inner.lock().unwrap().num_removing_slabs(&self.access); + self.access.num_slabs() - removing_slabs } pub fn capacity(&self) -> u64 { self.access.capacity() } + + pub fn disk_capacity(&self, disk: DiskId) -> u64 { + self.access.disk_capacity(disk) + } + + pub fn disk_is_fully_evacuated(&self, disk: DiskId) -> bool { + let disk_num_slabs = self.access.disk_num_slabs(disk); + let noalloc_slabs = self + .inner + .lock() + .unwrap() + .noalloc_state + .get(disk) + .map(|slabs| slabs.len()) + .unwrap_or_default() as u64; + disk_num_slabs == noalloc_slabs + } + + pub fn removing_capacity(&self) -> u64 { + self.inner + .lock() + .unwrap() + .noalloc_state + .keys() + .map(|disk_id| self.disk_capacity(disk_id)) + .sum() + } + + pub fn removing_disks(&self) -> impl Iterator { + self.inner + .lock() + .unwrap() + .noalloc_state + .keys() + .collect::>() + .into_iter() + } } #[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)] @@ -389,6 +641,12 @@ impl Add for SlabId { } } +impl AddAssign for SlabId { + fn add_assign(&mut self, other: u64) { + *self = SlabId(self.0 + other); + } +} + impl Sub for SlabId { type Output = u64; fn sub(self, rhs: SlabId) -> u64 { @@ -401,3 +659,9 @@ impl From for usize { usize::from64(val.0) } } + +impl From for SlabId { + fn from(val: usize) -> Self { + SlabId(val as u64) + } +} diff --git a/cmd/zfs_object_agent/zettacache/src/space_map.rs b/cmd/zfs_object_agent/zettacache/src/space_map.rs index 8f09a0037aa1..7a4f5d147bc6 100644 --- a/cmd/zfs_object_agent/zettacache/src/space_map.rs +++ b/cmd/zfs_object_agent/zettacache/src/space_map.rs @@ -5,6 +5,7 @@ use futures::stream::StreamExt; use serde::Deserialize; use serde::Serialize; +use crate::base_types::DiskId; use crate::base_types::Extent; use crate::block_access::BlockAccess; use crate::block_allocator::SlabPhysType; @@ -131,4 +132,9 @@ impl SpaceMap { self.log.clear(); self.alloc_entries = 0; } + + // Returns the number of slabs moved. + pub async fn transfer_data_for_removal(&mut self, disk: DiskId) -> u64 { + self.log.transfer_data_for_removal(disk).await + } } diff --git a/cmd/zfs_object_agent/zettacache/src/superblock.rs b/cmd/zfs_object_agent/zettacache/src/superblock.rs index 5133bd8d0668..a6cc4670bd39 100644 --- a/cmd/zfs_object_agent/zettacache/src/superblock.rs +++ b/cmd/zfs_object_agent/zettacache/src/superblock.rs @@ -57,7 +57,6 @@ impl DiskPhys { } } -/// State that's only needed on the primary disk (currently, always DiskId(0)). #[derive(Serialize, Deserialize, Debug, Clone)] pub struct PrimaryPhys { pub checkpoint_id: CheckpointId, @@ -74,6 +73,8 @@ pub struct PrimaryPhys { /// Subset of PrimaryPhys that's needed to get the feature flags. #[derive(Deserialize, Debug, Clone)] pub struct PrimaryFeaturesPhys { + // The checkpoint_id is needed so we can pick the latest primary + checkpoint_id: CheckpointId, feature_flags: Vec, } @@ -92,19 +93,28 @@ impl PrimaryPhys { } } - /// Write superblocks to all disks. + pub async fn write( + &self, + primary_disk: DiskId, + cache_guid: CacheGuid, + block_access: &BlockAccess, + ) { + let phys = SuperblockPhys { + primary: Some(self.clone()), + disk: primary_disk, + cache_guid, + disk_guid: Some(self.disks.get(&primary_disk).unwrap().guid), + }; + phys.write(block_access, primary_disk).await; + } + + /// Write superblocks to all disks. Used during cache creation. pub async fn write_all( &self, primary_disk: DiskId, cache_guid: CacheGuid, block_access: &BlockAccess, ) { - // Write the non-primary disks first, so that newly-added disks will - // have their superblocks present before the primary superblock is - // updated to indicate that they are part of the cache. If we wrote all - // the disks (including the primary) at once, we could crash after the - // primary was updated but a new disk had not yet been updated. The - // cache would be left in an inconsistent state and could not be opened. block_access .disks() .filter(|&disk| disk != primary_disk) @@ -120,14 +130,7 @@ impl PrimaryPhys { .collect::>() .count() .await; - - let phys = SuperblockPhys { - primary: Some(self.clone()), - disk: primary_disk, - cache_guid, - disk_guid: Some(self.disks.get(&primary_disk).unwrap().guid), - }; - phys.write(block_access, primary_disk).await; + self.write(primary_disk, cache_guid, block_access).await; } pub async fn read_features(block_access: &BlockAccess) -> Result> { @@ -144,14 +147,14 @@ impl PrimaryPhys { let (mut primary, primary_disk, cache_guid) = results .iter() - .find_map(|result| { - if let Ok(phys) = result { - phys.primary - .as_ref() - .map(|primary| (primary.clone(), phys.disk, phys.cache_guid)) - } else { - None - } + .flatten() + .max_by_key(|phys| phys.primary.as_ref().map(|p| p.checkpoint_id)) + .map(|phys| { + ( + phys.primary.as_ref().unwrap().clone(), + phys.disk, + phys.cache_guid, + ) }) .ok_or_else(|| anyhow!("Primary Superblock not found"))?; @@ -164,17 +167,12 @@ impl PrimaryPhys { }) .collect::>(); - for (id, result) in results.iter().enumerate() { - // XXX proper error handling - // XXX we should be able to reorder them? - if let Ok(phys) = result { - let disk = DiskId::new(id); - assert_eq!(disk, phys.disk); - assert!(phys.primary.is_none() || phys.disk == primary_disk); - assert_eq!(phys.cache_guid, cache_guid); - if let Some(disk_guid) = phys.disk_guid { - assert_eq!(disk_guid, primary.disks.get(&disk).unwrap().guid); - } + // XXX proper error handling + for phys in results.iter().flatten() { + assert_eq!(phys.cache_guid, cache_guid); + assert!(phys.disk != primary_disk || phys.primary.is_some()); + if let Some(disk_guid) = phys.disk_guid { + assert_eq!(disk_guid, primary.disks.get(&phys.disk).unwrap().guid); } } let sector_size = block_access.round_up_to_sector::(1); @@ -225,7 +223,7 @@ impl SuperblockPhys { .await } - async fn write(&self, block_access: &BlockAccess, disk: DiskId) { + pub async fn write(&self, block_access: &BlockAccess, disk: DiskId) { maybe_die_with(|| format!("before writing {:#?}", self)); debug!("writing {:#?}", self); let raw = block_access.chunk_to_raw(EncodeType::Json, self); @@ -237,6 +235,7 @@ impl SuperblockPhys { DiskIoType::MaintenanceWrite, ) .await; + maybe_die_with(|| format!("after writing {:#?}", self)); } pub async fn dump_all(block_access: &BlockAccess) { @@ -268,33 +267,27 @@ impl SuperblockPhys { } impl PrimaryFeaturesPhys { - /// Return value is (Self, primary_disk, cache_guid, extra_disks) pub async fn read(block_access: &BlockAccess) -> Result { let results = SuperblockFeaturesPhys::read_all(block_access).await; let (primary, primary_disk, cache_guid) = results .iter() - .find_map(|result| { - if let Ok(phys) = result { - phys.primary - .as_ref() - .map(|primary| (primary.clone(), phys.disk, phys.cache_guid)) - } else { - None - } + .flatten() + .max_by_key(|phys| phys.primary.as_ref().map(|p| p.checkpoint_id)) + .map(|phys| { + ( + phys.primary.as_ref().unwrap().clone(), + phys.disk, + phys.cache_guid, + ) }) .ok_or_else(|| anyhow!("Primary Superblock not found"))?; - for (id, result) in results.iter().enumerate() { - // XXX proper error handling - // XXX we should be able to reorder them? - if let Ok(phys) = result { - assert_eq!(DiskId::new(id), phys.disk); - assert_eq!(phys.cache_guid, cache_guid); - assert!(phys.primary.is_none() || phys.disk == primary_disk); - } + // XXX proper error handling + for phys in results.iter().flatten() { + assert_eq!(phys.cache_guid, cache_guid); + assert!(phys.disk != primary_disk || phys.primary.is_some()); } - Ok(primary) } } diff --git a/cmd/zfs_object_agent/zettacache/src/zcachedb.rs b/cmd/zfs_object_agent/zettacache/src/zcachedb.rs index 4f734a2a34f4..0a95d5cf9b09 100644 --- a/cmd/zfs_object_agent/zettacache/src/zcachedb.rs +++ b/cmd/zfs_object_agent/zettacache/src/zcachedb.rs @@ -1,5 +1,7 @@ +use std::collections::BTreeMap; use std::path::PathBuf; +use crate::base_types::DiskId; use crate::zettacache::zcdb::ZCacheDBHandle; use crate::CacheOpenMode; @@ -138,7 +140,7 @@ impl ZettaCacheDBCommand { async fn issue_pool_state_command( command: ZettaCacheDBCommand, - paths: Vec, + paths: BTreeMap, ) -> Result<(), anyhow::Error> { let mut handle = ZCacheDBHandle::open(paths).await?; match command { diff --git a/cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs b/cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs index 2b85340351a0..f09994227a42 100644 --- a/cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs +++ b/cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs @@ -1,5 +1,6 @@ pub mod merge; pub mod remap; +pub mod removal; pub mod zcdb; use std::collections::btree_map; @@ -33,6 +34,7 @@ use futures::FutureExt; use log::*; use lru::LruCache; use more_asserts::*; +use rand::seq::IteratorRandom; use rand::Rng; use serde::Deserialize; use serde::Serialize; @@ -72,6 +74,7 @@ use self::merge::MergeMessage; use self::merge::MergeProgressPhys; use self::merge::MergeState; use self::remap::RemapState; +use self::removal::DeviceRemoval; use crate::atime_histogram::AtimeHistogram; use crate::atime_histogram::AtimeHistogramPhys; use crate::base_types::*; @@ -95,6 +98,7 @@ use crate::slab_allocator::SlabAllocatorPhys; use crate::slab_allocator::RESERVED_SLABS_PCT; use crate::superblock::DiskPhys; use crate::superblock::PrimaryPhys; +use crate::superblock::SuperblockPhys; use crate::superblock::SUPERBLOCK_SIZE; use crate::CacheOpenMode; @@ -152,6 +156,13 @@ tunable! { static ref CORRUPTION_FILL: u8 = 0x31; // fill blocks with '1' } +// Max number of merge cycles needed for a device to be removed given that it no longer has +// any user data. This should be at most 2 which is the worst case where we started a removal +// mid-way of a merge and the device had no user data (e.g. data in the block allocator). In +// that case we start counting our merges and the first merge doesn't really count because +// removal started midway so the index metadata was not fully moved away from the device. +const REMOVAL_MAX_MERGES_FOR_METADATA_EVACUATION: u8 = 2; + /// A PendingChange is the in-core data structure for tracking changes to the index between merges. /// Two types of events are tracked: insertions and lookup hits (atime update). Note that there is /// no removal event here because we do not allow removes in general. Cache content is only removed @@ -219,7 +230,6 @@ struct Locked { block_access: Arc, primary: PrimaryPhys, guid: CacheGuid, - primary_disk: DiskId, block_allocator: BlockAllocator, pending_changes: PendingChanges, pending_changes_trigger: usize, @@ -240,6 +250,7 @@ struct Locked { // This is needed to ensure that writes complete before we complete the next // checkpoint, so that they are persisted to disk. outstanding_writes: ConcurrentBatch, + device_removal: DeviceRemoval, atime: Atime, stats: Arc, @@ -398,6 +409,32 @@ impl ZettaCache { } } + pub async fn remove_disk(&self, path: &Path) -> Result<()> { + match &*self.inner.load() { + Some(inner) => inner.remove_disk(path).await, + None => Err(anyhow!("disk {path:?} is not part of the zettacache")), + } + } + + pub async fn cancel_disk_removal(&self, path: &Path) -> Result<()> { + match &*self.inner.load() { + Some(inner) => inner.cancel_disk_removal(path).await, + None => Err(anyhow!("disk {path:?} is not part of the zettacache")), + } + } + + pub async fn pause_disk_removals(&self) { + if let Some(inner) = &*self.inner.load() { + inner.pause_disk_removals().await; + } + } + + pub async fn resume_disk_removals(&self) { + if let Some(inner) = &*self.inner.load() { + inner.resume_disk_removals().await; + } + } + pub async fn initiate_merge(&self) { if let Some(inner) = &*self.inner.load() { inner.initiate_merge().await; @@ -458,11 +495,12 @@ impl ZettaCache { impl Inner { async fn create(paths: Vec) -> Result<()> { - let mut disks: Vec = Vec::with_capacity(paths.len()); - for path in paths { - disks.push(Disk::new(&path, false)?); - } - let block_access = BlockAccess::new(disks, false); + let paths = paths + .into_iter() + .enumerate() + .map(|(id, path)| (DiskId::new(id), path)) + .collect(); + let block_access = BlockAccess::new(&paths, false)?; let guid = CacheGuid::new(); @@ -490,8 +528,10 @@ impl Inner { ), merge_progress: None, + device_removal: Default::default(), }; - let slab_allocator = SlabAllocatorBuilder::new(checkpoint.slab_allocator.clone()).build(); + let slab_allocator = + SlabAllocatorBuilder::new(checkpoint.slab_allocator.clone()).build(&Vec::new()); let checkpoint_extents = checkpoint.write(&block_access, &slab_allocator).await; PrimaryPhys::new( block_access @@ -548,25 +588,21 @@ impl Inner { if paths.is_empty() { return Err(CacheOpenError::NoDevices); } - let mut disks: Vec = Vec::with_capacity(paths.len()); - for path in &paths { - disks.push(Disk::new(path, false)?); - } - let block_access = Arc::new(BlockAccess::new(disks, false)); + let block_access = Arc::new(BlockAccess::new(&paths, false)?); let feature_flags = match PrimaryPhys::read_features(&block_access).await { Ok(f) => f, Err(_) => { // XXX need proper create CLI - Self::create(paths.clone()).await?; + Self::create(paths.values().cloned().collect()).await?; PrimaryPhys::read_features(&block_access).await.unwrap() } }; - check_features(&feature_flags) - .map_err(|e| CacheOpenError::IncompatibleFeatures(paths, e))?; + check_features(&feature_flags).map_err(|e| { + CacheOpenError::IncompatibleFeatures(paths.values().cloned().collect(), e) + })?; - let (mut primary, primary_disk, guid, extra_disks) = - PrimaryPhys::read(&block_access).await.unwrap(); + let (mut primary, _, guid, extra_disks) = PrimaryPhys::read(&block_access).await.unwrap(); let mut checkpoint = CheckpointPhys::read(&block_access, &primary.checkpoint).await?; assert_eq!(checkpoint.id, primary.checkpoint_id); @@ -640,7 +676,7 @@ impl Inner { .await; block_builder.claim(&mut slab_builder); - let slab_allocator = Arc::new(slab_builder.build()); + let slab_allocator = Arc::new(slab_builder.build(&checkpoint.device_removal.pending)); let block_allocator = block_builder.build(slab_allocator.clone()); let operation_log = BlockBasedLog::open( @@ -742,10 +778,10 @@ impl Inner { size_histogram: checkpoint.size_histogram, operation_log, primary, - primary_disk, guid, outstanding_reads: Default::default(), outstanding_writes: Default::default(), + device_removal: DeviceRemoval::open(checkpoint.device_removal), atime: checkpoint.last_atime, block_allocator, slab_allocator, @@ -1005,6 +1041,7 @@ impl Inner { let mut locked = lock_non_send_measured!(&self.locked).await; locked.rotate_index(&mut indices.old, new_index); locked.block_allocator.rebalance_fini(); + locked.device_removal.complete_merge_cycle(); indices.new = None; merging = None; completed_merge = true; @@ -1491,7 +1528,7 @@ impl Inner { } async fn add_disk(&self, path: &Path) -> Result<()> { - lock_non_send_measured!(&self.locked).await.add_disk(path)?; + lock_measured!(&self.locked).await.add_disk(path).await?; self.sync_checkpoint().await; Ok(()) } @@ -1505,6 +1542,28 @@ impl Inner { Ok(additional_bytes) } + async fn remove_disk(&self, path: &Path) -> Result<()> { + self.locked.lock().await.remove_disk(path)?; + self.sync_checkpoint().await; + Ok(()) + } + + async fn cancel_disk_removal(&self, path: &Path) -> Result<()> { + self.locked.lock().await.cancel_disk_removal(path)?; + self.sync_checkpoint().await; + Ok(()) + } + + async fn pause_disk_removals(&self) { + self.locked.lock().await.pause_disk_removals(); + self.sync_checkpoint().await; + } + + async fn resume_disk_removals(&self) { + self.locked.lock().await.resume_disk_removals(); + self.sync_checkpoint().await; + } + async fn initiate_merge(&self) { lock_non_send_measured!(&self.locked).await.request_merge(); self.sync_checkpoint().await; @@ -1808,6 +1867,75 @@ impl Locked { completed_merge: bool, pool_guids: PoolGuidMappingPhys, ) { + if let Some(removal_entry) = self.device_removal.removing_disk_entry() { + let disk = removal_entry.disk; + + // The stages of the removal are the following: + // + // [1] We wait for the merge logic to evict enough blocks so that there's sufficient + // space available to the block allocator to move all blocks off the target disk. + // + // [2] Once we have enough free space to start the evacuation for the first disk + // being removed, we issue a rebalance in the block allocator to move its data to + // non-removing disks. + // + // [3] Once that evacuation is done we want to move any metadata away from that disk + // so that its fully evacuated. The first piece of metadata that we move is anything + // that's not refreshed every merge cycle by the index (currently this means only the + // block allocator's spacemaps) - we explicitly move those. + // + // [4] Once that is done, we have the expecation that all other metadata (index, + // operation_log, etc..) will evacuate implicitly over the next couple of merge cyles + // as we free their old data which may reside in the removing disks and allocate new + // ones in allocatable disks. Since we don't have explict control of the merge tasks + // that we spawn from here, we bound this removal stage to a specific number of + // cycles (see REMOVAL_MAX_MERGES_FOR_METADATA_EVACUATION). If that bound is ever + // crossed then we panic as it is implied that the removal is not making progress. + // + // [5] Once the disk is fully evacuated we remove it from our in-memory structures + // which will later be flushed to disk, effectively finishing the removal of said + // disk. + // + // XXX: The REMOVAL_MAX_MERGES_FOR_METADATA_EVACUATION mechanism is not ideal for + // now as we don't store the progress that we've made in terms of merge cycles + // on disk for each removal. This means that if the agent restarts we'll have + // to wait for another N merges before detecting that we aren't making progress. + if !self.block_allocator.disk_is_pending_remap(disk) { + if !self.slab_allocator.disk_is_fully_evacuated(disk) { + match removal_entry.merge_cycles_left { + Some(cycles_left) => { + assert_gt!( + cycles_left, + 0, + "took more than {} merge cycles to remove the devices metadata", + REMOVAL_MAX_MERGES_FOR_METADATA_EVACUATION + ); + } + None => { + let slabs_moved = self + .block_allocator + .transfer_metadata_for_removal(disk) + .await; + removal_entry.merge_cycles_left = + Some(REMOVAL_MAX_MERGES_FOR_METADATA_EVACUATION); + info!( + "removal: {disk:?}: {slabs_moved} metadata slabs moved; awaiting at most {} merge cycles", + REMOVAL_MAX_MERGES_FOR_METADATA_EVACUATION + ); + } + } + } else { + self.slab_allocator.remove_disk(disk); + self.block_allocator.remove_disk(disk); + self.block_access.remove_disk(disk); + self.device_removal.complete_removal(disk); + let removed = self.primary.disks.remove(&disk); + assert!(removed.is_some()); + info!("removal: completed for {disk:?}"); + } + } + } + debug!( "flushing checkpoint {:?}", self.primary.checkpoint_id.next() @@ -1853,6 +1981,7 @@ impl Locked { block_allocator: self.block_allocator.flush(completed_merge).await, size_histogram: self.size_histogram.clone(), merge_progress: merge_progress_phys, + device_removal: self.device_removal.to_phys(), }; let checkpoint_extents = checkpoint @@ -1865,9 +1994,17 @@ impl Locked { } self.primary.checkpoint_id = checkpoint.id; self.primary.feature_flags = SUPPORTED_FEATURES.keys().cloned().collect(); - // We need to write all the disks' superblocks in case new disks have been added. + + // The following call is the last step on saving the cache's state. Any crash that + // happens after this and up until the next checkpoint, will have us restart from the + // state recorded here. + let primary_disk = self + .block_access + .disks() + .choose(&mut rand::thread_rng()) + .unwrap(); self.primary - .write_all(self.primary_disk, self.guid, &self.block_access) + .write(primary_disk, self.guid, &self.block_access) .await; self.slab_allocator.set_reservation( @@ -1942,24 +2079,67 @@ impl Locked { fn space_to_evict(&self) -> u64 { let allocatable_from_slabs = self.slab_allocator.allocatable_bytes(); - let allocatable_from_blocks = self.block_allocator.available(); - let target_allocatable = TARGET_FREE_BLOCKS_PCT.apply(self.slab_allocator.capacity()); + let allocatable_from_blocks = self.block_allocator.allocatable_bytes(); + let slab_allocator_capacity = + self.slab_allocator.capacity() - self.slab_allocator.removing_capacity(); + + // If we are removing devices make sure that we have enough free space evacuate their + // data to the rest of the cache, on top of our TARGET_FREE_BLOCKS_PCT. + let bytes_to_relocate = self.block_allocator.removing_bytes(); + + let target_allocatable = + TARGET_FREE_BLOCKS_PCT.apply(slab_allocator_capacity) + bytes_to_relocate; let reduction = target_allocatable .saturating_sub(allocatable_from_slabs) .saturating_sub(allocatable_from_blocks); info!( - "want to evict {} of allocated blocks ({} allocatable slabs; {} allocatable blocks; {} target; {} freeing; {} histogram)", + "want to evict {} of allocated blocks ({} allocatable slabs; {} allocatable blocks; {} target; {} histogram)", nice_p2size(reduction), nice_p2size(allocatable_from_slabs), nice_p2size(allocatable_from_blocks), nice_p2size(target_allocatable), - nice_p2size(self.block_allocator.freeing()), nice_p2size(self.atime_histogram.sum_live()), ); reduction } + /// Returns true if there is space to evacuate the allocated space of the first removing + /// device plus some slop space. Otherwise, false. + fn removal_can_evacuate(&self) -> bool { + if self.device_removal.paused { + return false; + } + + match self.device_removal.removing_disk() { + Some(disk) => { + if !self.block_allocator.disk_is_pending_remap(disk) { + return false; + } + + let space_to_evacuate = self.block_allocator.disk_allocated_bytes(disk); + if space_to_evacuate == 0 { + return false; + } + + let allocatable_space = self.slab_allocator.allocatable_bytes() + + self.block_allocator.allocatable_bytes(); + let slab_allocator_capacity = + self.slab_allocator.capacity() - self.slab_allocator.removing_capacity(); + let slop = TARGET_FREE_BLOCKS_PCT.apply(slab_allocator_capacity); + + debug!( + "removal: {} to evacuate (plus slop: {}); {} available", + nice_p2size(space_to_evacuate), + nice_p2size(slop), + nice_p2size(allocatable_space) + ); + allocatable_space > (space_to_evacuate + slop) + } + None => false, + } + } + fn need_merge(&self) -> bool { let mut need_merge = false; @@ -1983,6 +2163,16 @@ impl Locked { } } + { + if self.removal_can_evacuate() { + debug!( + "starting merge due to data evacuation of {:?}", + self.device_removal.removing_disk().unwrap() + ); + need_merge = true; + } + } + { let slabs = self.slab_allocator.num_slabs_to_evacuate(); if slabs > EVACUATION_MIN_BATCH_PCT.apply(self.slab_allocator.num_slabs()) { @@ -1991,6 +2181,17 @@ impl Locked { } } + { + // If we've evacuated the disk's user data (i.e. block_allocator slabs) and misc + // metadata explicitly but there is no need for us to start a merge (e.g. no need to + // evict space) make sure that we force a merge anyway to evacuate any index metadata + // that are part of the removing disk. + if self.device_removal.need_index_evacuation() { + info!("starting merge due to pending removal"); + need_merge = true; + } + } + if self.merge_requested { debug!("starting merge due to user request"); need_merge = true; @@ -2043,7 +2244,12 @@ impl Locked { ) .await; - let remap = match self.block_allocator.rebalance_init() { + let removing_disk = if self.removal_can_evacuate() { + self.device_removal.removing_disk() + } else { + None + }; + let remap = match self.block_allocator.rebalance_init(removing_disk) { None => None, Some(map) => { // We need to ensure that rebalance() won't copy from blocks that we're still in @@ -2179,14 +2385,14 @@ impl Locked { self.stats .track_instantaneous(SlabCapacity, self.slab_allocator.capacity()); self.stats - .track_instantaneous(AvailableBlocksSize, self.block_allocator.available()); + .track_instantaneous(FreeBlocksSize, self.block_allocator.free_bytes()); self.stats.track_instantaneous( - AvailableSlabsSize, + FreeSlabsSize, self.slab_allocator.free_slabs() * self.slab_allocator.slab_size(), ); self.stats.track_instantaneous( AvailableSpace, - self.block_allocator.available() + self.slab_allocator.allocatable_bytes(), + self.block_allocator.allocatable_bytes() + self.slab_allocator.allocatable_bytes(), ); let old_pending = match &self.merge { Some(ms) => ms.old_pending_changes.len() as u64, @@ -2198,7 +2404,7 @@ impl Locked { ); } - fn add_disk(&mut self, path: &Path) -> Result<()> { + async fn add_disk(&mut self, path: &Path) -> Result<()> { // We hold the state lock across all these operations to ensure that we're always // adding the last DiskId to the SlabAllocator and Primary, in the case of concurrent // calls to add_disk(). @@ -2210,11 +2416,20 @@ impl Locked { .disk_extent(disk_id) .trim_start(SUPERBLOCK_SIZE), ); + self.block_allocator.add_disk(disk_id); self.primary .disks .insert(disk_id, DiskPhys::new(self.block_access.disk_size(disk_id))); + let superblock = SuperblockPhys { + primary: None, + disk: disk_id, + cache_guid: self.guid, + disk_guid: Some(self.primary.disks.get(&disk_id).unwrap().guid), + }; + superblock.write(&self.block_access, disk_id).await; + // The hit data isn't accurate across cache size changes, so clear // it, which also updates the histogram parameters to reflect the // new cache size. @@ -2227,6 +2442,11 @@ impl Locked { // Returns the amount of additional space, in bytes. fn expand_disk(&mut self, path: &Path) -> Result { let disk = self.block_access.path_to_disk_id(path)?; + + if self.device_removal.disk_is_pending_removal(disk) { + return Err(anyhow!("Cannot expand {path:?} - disk is being removed")); + } + let response = self.block_access.expand_disk(disk)?; if response.additional_bytes > 0 { let phys = self.primary.disks.get_mut(&disk).unwrap(); @@ -2247,6 +2467,63 @@ impl Locked { Ok(response) } + fn remove_disk(&mut self, path: &Path) -> Result<()> { + let begin = Instant::now(); + + let disk = self.block_access.path_to_disk_id(path)?; + if self.device_removal.disk_is_pending_removal(disk) { + return Err(anyhow!("{path:?} is already being removed")); + } + + // TODO: DLPX-81553 + // If we get rid of all the devices in the cache we should make sure that + // all the threads related to Inner and Locked together with the references + // that they contain to various structs are dropped correctly and we don't + // leak anything. + if self.device_removal.disks_in_queue() + 1 == self.block_access.disks().count() { + return Err(anyhow!("removal of all disks is not allowed")); + } + + self.slab_allocator.mark_noalloc_disk(disk); + self.block_allocator.mark_noalloc_disk(disk); + self.device_removal.add_to_queue(disk); + + info!( + "issued removal of {path:?} in {}ms", + begin.elapsed().as_millis(), + ); + Ok(()) + } + + fn cancel_disk_removal(&mut self, path: &Path) -> Result<()> { + let begin = Instant::now(); + + let disk = self.block_access.path_to_disk_id(path)?; + if !self.device_removal.disk_is_pending_removal(disk) { + return Err(anyhow!("{path:?} not currently being removed")); + } + + self.slab_allocator.unmark_noalloc_disk(disk); + self.block_allocator.unmark_noalloc_disk(disk); + self.device_removal.remove_from_queue(disk); + + info!( + "cancelled removal of {path:?} in {}ms", + begin.elapsed().as_millis(), + ); + Ok(()) + } + + fn pause_disk_removals(&mut self) { + self.device_removal.paused = true; + info!("pausing removals"); + } + + fn resume_disk_removals(&mut self) { + self.device_removal.paused = false; + info!("resuming removals"); + } + fn request_merge(&mut self) { self.merge_requested = true; } diff --git a/cmd/zfs_object_agent/zettacache/src/zettacache/removal.rs b/cmd/zfs_object_agent/zettacache/src/zettacache/removal.rs new file mode 100644 index 000000000000..7bbab455748d --- /dev/null +++ b/cmd/zfs_object_agent/zettacache/src/zettacache/removal.rs @@ -0,0 +1,118 @@ +use log::info; +use serde::Deserialize; +use serde::Serialize; + +use crate::base_types::DiskId; + +#[derive(Default, Serialize, Deserialize, Debug, Clone)] +pub struct DeviceRemovalPhys { + pub pending: Vec, + paused: bool, +} + +pub struct DeviceRemovalEntry { + pub disk: DiskId, + // Indicates the number of merge cycles that we expect for the removal of all index-related + // metadata from the removing device. Value is None if we are still waiting for the removal + // of BlockAllocator's data and miscellaneous metadata not related to the index (see comment + // in Locked::flush_checkpoint). + pub merge_cycles_left: Option, +} + +pub struct DeviceRemoval { + pending: Vec, + pub paused: bool, +} + +impl DeviceRemoval { + pub fn open(phys: DeviceRemovalPhys) -> Self { + DeviceRemoval { + pending: phys + .pending + .iter() + .map(|&disk| DeviceRemovalEntry { + disk, + merge_cycles_left: None, + }) + .collect(), + paused: phys.paused, + } + } + + pub fn to_phys(&self) -> DeviceRemovalPhys { + DeviceRemovalPhys { + pending: self + .pending + .iter() + .map(|removal_entry| removal_entry.disk) + .collect(), + paused: self.paused, + } + } + + /// Return the DiskId that we are actively removing. + pub fn removing_disk(&self) -> Option { + self.pending.get(0).map(|removal_entry| removal_entry.disk) + } + + /// Return the DeviceRemovalEntry of the disk we are actively removing. + pub fn removing_disk_entry(&mut self) -> Option<&mut DeviceRemovalEntry> { + self.pending.get_mut(0) + } + + pub fn disks_in_queue(&self) -> usize { + self.pending.len() + } + + pub fn add_to_queue(&mut self, disk: DiskId) { + self.pending.push(DeviceRemovalEntry { + disk, + merge_cycles_left: None, + }); + } + + pub fn remove_from_queue(&mut self, disk: DiskId) { + let pos = self + .pending + .iter() + .position(|removal_entry| removal_entry.disk == disk) + .unwrap(); + self.pending.remove(pos); + } + + pub fn disk_is_pending_removal(&self, disk: DiskId) -> bool { + self.pending + .iter() + .any(|removal_entry| removal_entry.disk == disk) + } + + /// Returns true if we are in the last stage of removal where we are waiting for + /// the index metadata to be evacuated from our currently removing disk. False + /// otherwise. + pub fn need_index_evacuation(&self) -> bool { + if let Some(removal_entry) = self.pending.get(0) { + return removal_entry.merge_cycles_left.is_some(); + } + false + } + + /// Called at the completion of each merge cycle, we decrement the `merge_cycles_left` + /// counter of the removal in progress as a sign for tracking our progress (see comment in + /// Locked::flush_checkpoint). + pub fn complete_merge_cycle(&mut self) { + if let Some(removal_entry) = self.pending.get_mut(0) { + if let Some(cycles_left) = removal_entry.merge_cycles_left.as_mut() { + *cycles_left = cycles_left.checked_sub(1).unwrap(); + info!( + "removal: {cycles_left} merge cycles left for {:?}", + removal_entry.disk + ) + } + } + } + + pub fn complete_removal(&mut self, disk: DiskId) { + let removal_entry = self.pending.remove(0); + assert_eq!(removal_entry.disk, disk) + } +} diff --git a/cmd/zfs_object_agent/zettacache/src/zettacache/zcdb.rs b/cmd/zfs_object_agent/zettacache/src/zettacache/zcdb.rs index b999ba1e5ace..37de2d9e0602 100644 --- a/cmd/zfs_object_agent/zettacache/src/zettacache/zcdb.rs +++ b/cmd/zfs_object_agent/zettacache/src/zettacache/zcdb.rs @@ -1,17 +1,16 @@ +use std::collections::BTreeMap; use std::path::PathBuf; use std::sync::Arc; use anyhow::Result; use futures::StreamExt; use util::nice_p2size; -use util::writeln_stderr; use util::writeln_stdout; use super::CheckpointPhys; use crate::base_types::CacheGuid; use crate::base_types::DiskId; use crate::block_access::BlockAccess; -use crate::block_access::Disk; use crate::block_allocator::zcdb::zcachedb_dump_slabs; use crate::block_allocator::zcdb::zcachedb_dump_spacemaps; use crate::features::check_features; @@ -33,32 +32,19 @@ pub struct ZCacheDBHandle { } impl ZCacheDBHandle { - pub async fn dump_superblocks(paths: Vec) -> Result<()> { - let mut disks: Vec = Vec::with_capacity(paths.len()); - for path in paths { - match Disk::new(&path, true) { - Ok(disk) => disks.push(disk), - Err(err) => writeln_stderr!("error: {}", err), - } - } - if disks.is_empty() { - return Ok(()); - } - let block_access = BlockAccess::new(disks, true); + pub async fn dump_superblocks(paths: BTreeMap) -> Result<()> { + let block_access = BlockAccess::new(&paths, true)?; SuperblockPhys::dump_all(&block_access).await; Ok(()) } - pub async fn open(paths: Vec) -> Result { - let mut disks: Vec = Vec::with_capacity(paths.len()); - for path in &paths { - disks.push(Disk::new(path, true)?); - } - let block_access = Arc::new(BlockAccess::new(disks, true)); + pub async fn open(paths: BTreeMap) -> Result { + let block_access = Arc::new(BlockAccess::new(&paths, true)?); let feature_flags = PrimaryPhys::read_features(&block_access).await?; - check_features(&feature_flags) - .map_err(|e| CacheOpenError::IncompatibleFeatures(paths, e))?; + check_features(&feature_flags).map_err(|e| { + CacheOpenError::IncompatibleFeatures(paths.values().cloned().collect(), e) + })?; let (primary, primary_disk, guid, _extra_disks) = PrimaryPhys::read(&block_access).await?; let checkpoint = Arc::new(CheckpointPhys::read(&block_access, &primary.checkpoint).await?); @@ -84,13 +70,7 @@ impl ZCacheDBHandle { writeln_stdout!(" Primary {:?}, {:?}", self.primary_disk, self.guid); writeln_stdout!(); - let slabs_capacity = self - .checkpoint - .slab_allocator - .capacity() - .iter() - .map(|extent| extent.size) - .sum(); + let slabs_capacity = self.checkpoint.slab_allocator.active_capacity_bytes(); writeln_stdout!("Slabs Region: {}", nice_p2size(slabs_capacity)); writeln_stdout!("-------------------------------"); let mut total_used_bytes = 0; diff --git a/cmd/zfs_object_agent/zettaobject/src/root_connection.rs b/cmd/zfs_object_agent/zettaobject/src/root_connection.rs index 906d2bcb5721..9285f40c2b8d 100644 --- a/cmd/zfs_object_agent/zettaobject/src/root_connection.rs +++ b/cmd/zfs_object_agent/zettaobject/src/root_connection.rs @@ -122,6 +122,19 @@ impl RootConnectionState { server.register_handler(TYPE_CLEAR_HIT_DATA, Box::new(Self::clear_hit_data)); server.register_handler(TYPE_ADD_DISK, Box::new(Self::add_disk)); server.register_handler(TYPE_EXPAND_DISK, Box::new(Self::expand_disk)); + server.register_handler(TYPE_REMOVE_DISK, Box::new(Self::remove_disk)); + server.register_handler( + TYPE_CANCEL_DISK_REMOVAL, + Box::new(Self::cancel_disk_removals), + ); + server.register_handler( + TYPE_PAUSE_DISK_REMOVALS, + Box::new(Self::pause_disk_removals), + ); + server.register_handler( + TYPE_RESUME_DISK_REMOVALS, + Box::new(Self::resume_disk_removals), + ); server.register_handler(TYPE_SYNC_CHECKPOINT, Box::new(Self::sync_checkpoint)); server.register_handler(TYPE_INITIATE_MERGE, Box::new(Self::initiate_merge)); server.register_struct_handler(MessageType::ReadBlock, Box::new(Self::read_block)); @@ -606,6 +619,54 @@ impl RootConnectionState { })) } + fn remove_disk(&mut self, nvl: NvList) -> HandlerReturn { + let cache = self.cache.clone(); + Ok(Box::pin(async move { + let request: RemoveDiskRequest = nvpair::from_nvlist(&nvl)?; + debug!("got {:?}", request); + + let result = cache + .remove_disk(&request.path) + .await + .map_err(FailureMessage::new); + return_result((), result, true) + })) + } + + fn cancel_disk_removals(&mut self, nvl: NvList) -> HandlerReturn { + let cache = self.cache.clone(); + Ok(Box::pin(async move { + let request: RemoveDiskRequest = nvpair::from_nvlist(&nvl)?; + debug!("got {:?}", request); + + let result = cache + .cancel_disk_removal(&request.path) + .await + .map_err(FailureMessage::new); + return_result((), result, true) + })) + } + + fn pause_disk_removals(&mut self, _nvl: NvList) -> HandlerReturn { + let cache = self.cache.clone(); + Ok(Box::pin(async move { + debug!("got pause_disk_removals"); + + cache.pause_disk_removals().await; + return_ok((), true) + })) + } + + fn resume_disk_removals(&mut self, _nvl: NvList) -> HandlerReturn { + let cache = self.cache.clone(); + Ok(Box::pin(async move { + debug!("got resume_disk_removals"); + + cache.resume_disk_removals().await; + return_ok((), true) + })) + } + fn sync_checkpoint(&mut self, nvl: NvList) -> HandlerReturn { let cache = self.cache.clone(); Ok(Box::pin(async move {