From dbc6af6116edd8152500e72938aebd87868445a6 Mon Sep 17 00:00:00 2001 From: Matthew Ahrens Date: Mon, 13 Dec 2021 12:54:38 -0800 Subject: [PATCH] Primary Superblock (#64) Change the way we store the superblock on multiple disks. Now, one of the disks is designated as the "primary", which has the single source of truth about cache-wide info (checkpoint ID, checkpoint location, etc). The other disks only have identifying information (disk ID, cache ID). This reorganization should make it simpler to add disks. Also, move the superblock handling code to its own file. --- .../zettacache/src/block_allocator.rs | 2 +- cmd/zfs_object_agent/zettacache/src/lib.rs | 1 + .../zettacache/src/superblock.rs | 115 +++++++++ .../zettacache/src/zettacache.rs | 221 ++++-------------- 4 files changed, 164 insertions(+), 175 deletions(-) create mode 100644 cmd/zfs_object_agent/zettacache/src/superblock.rs diff --git a/cmd/zfs_object_agent/zettacache/src/block_allocator.rs b/cmd/zfs_object_agent/zettacache/src/block_allocator.rs index 6b7a2a567a3f..e7269ecbf87c 100644 --- a/cmd/zfs_object_agent/zettacache/src/block_allocator.rs +++ b/cmd/zfs_object_agent/zettacache/src/block_allocator.rs @@ -1,7 +1,6 @@ use crate::block_access::BlockAccess; use crate::extent_allocator::{ExtentAllocator, ExtentAllocatorBuilder}; use crate::space_map::{SpaceMap, SpaceMapEntry, SpaceMapPhys}; -use crate::zettacache::DEFAULT_SLAB_SIZE; use crate::{base_types::*, DumpSlabsOptions}; use bimap::BiBTreeMap; use lazy_static::lazy_static; @@ -23,6 +22,7 @@ use util::{get_tunable, TerseVec}; use util::{nice_p2size, From64}; lazy_static! { + static ref DEFAULT_SLAB_SIZE: u32 = get_tunable("default_slab_size", 16 * 1024 * 1024); static ref DEFAULT_SLAB_BUCKETS: SlabAllocationBucketsPhys = get_tunable("default_slab_buckets", SlabAllocationBucketsPhys::default()); static ref SLAB_CONDENSE_PER_CHECKPOINT: u64 = get_tunable("slab_condense_per_checkpoint", 10); diff --git a/cmd/zfs_object_agent/zettacache/src/lib.rs b/cmd/zfs_object_agent/zettacache/src/lib.rs index 86f2a905d060..907af981320b 100644 --- a/cmd/zfs_object_agent/zettacache/src/lib.rs +++ b/cmd/zfs_object_agent/zettacache/src/lib.rs @@ -12,6 +12,7 @@ mod extent_allocator; mod index; mod size_histogram; mod space_map; +mod superblock; mod zcachedb; mod zettacache; diff --git a/cmd/zfs_object_agent/zettacache/src/superblock.rs b/cmd/zfs_object_agent/zettacache/src/superblock.rs new file mode 100644 index 000000000000..140aa14081e5 --- /dev/null +++ b/cmd/zfs_object_agent/zettacache/src/superblock.rs @@ -0,0 +1,115 @@ +use crate::base_types::*; +use crate::block_access::*; +use anyhow::anyhow; +use anyhow::Result; +use futures::stream::*; +use log::*; +use serde::{Deserialize, Serialize}; +use util::maybe_die_with; + +pub const SUPERBLOCK_SIZE: u64 = 4 * 1024; + +/// State stored at the beginning of every disk +#[derive(Serialize, Deserialize, Debug, Clone)] +struct SuperblockPhys { + primary: Option, + disk: DiskId, + guid: u64, + // XXX put sector size in here too and verify it matches what the disk says now? + // XXX put disk size in here so we can detect expansion? +} + +/// State that's only needed on the primary disk (currently, always DiskId(0)). +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct PrimaryPhys { + pub checkpoint_id: CheckpointId, + pub checkpoint_capacity: Extent, // space available for checkpoints + pub checkpoint: Extent, // space used by latest checkpoint + pub num_disks: usize, +} + +impl PrimaryPhys { + /// Write superblocks to all disks. + pub async fn write_all(&self, primary_disk: DiskId, guid: u64, block_access: &BlockAccess) { + block_access + .disks() + .map(|disk| async move { + // Change the DiskId of this superblock to match the disk we're writing to. + let phys = if disk == primary_disk { + SuperblockPhys { + primary: Some(self.clone()), + disk, + guid, + } + } else { + SuperblockPhys { + primary: None, + disk, + guid, + } + }; + phys.write(block_access, disk).await; + }) + .collect::>() + .for_each(|_| async move {}) + .await; + } + + pub async fn read(block_access: &BlockAccess) -> Result<(Self, DiskId, u64)> { + let superblocks = SuperblockPhys::read_all(block_access).await?; + + let (primary, primary_disk, guid) = superblocks + .iter() + .find_map(|phys| { + phys.primary + .as_ref() + .map(|primary| (primary.clone(), phys.disk, phys.guid)) + }) + .ok_or_else(|| anyhow!("Primary Superblock not found"))?; + + for (id, phys) in superblocks.iter().enumerate() { + // XXX proper error handling + // XXX we should be able to reorder them? + assert_eq!(DiskId(id.try_into().unwrap()), phys.disk); + assert_eq!(phys.guid, guid); + assert!(phys.primary.is_none() || phys.disk == primary_disk); + } + + // XXX proper error handling + assert_eq!(block_access.disks().count(), primary.num_disks); + assert!(primary.checkpoint_capacity.contains(&primary.checkpoint)); + + Ok((primary, primary_disk, guid)) + } +} + +impl SuperblockPhys { + async fn read(block_access: &BlockAccess, disk: DiskId) -> Result { + let raw = block_access + .read_raw(Extent::new(disk, 0, SUPERBLOCK_SIZE)) + .await; + let (this, _): (Self, usize) = block_access.chunk_from_raw(&raw)?; + debug!("got {:#?}", this); + assert_eq!(this.disk, disk); + Ok(this) + } + + async fn read_all(block_access: &BlockAccess) -> Result> { + block_access + .disks() + .map(|disk| SuperblockPhys::read(block_access, disk)) + .collect::>() + .try_collect() + .await + } + + async fn write(&self, block_access: &BlockAccess, disk: DiskId) { + maybe_die_with(|| format!("before writing {:#?}", self)); + debug!("writing {:#?}", self); + let raw = block_access.chunk_to_raw(EncodeType::Json, self); + // XXX pad it out to SUPERBLOCK_SIZE? + block_access + .write_raw(DiskLocation { offset: 0, disk }, raw) + .await; + } +} diff --git a/cmd/zfs_object_agent/zettacache/src/zettacache.rs b/cmd/zfs_object_agent/zettacache/src/zettacache.rs index 307f390a289f..c13c2a5eea3e 100644 --- a/cmd/zfs_object_agent/zettacache/src/zettacache.rs +++ b/cmd/zfs_object_agent/zettacache/src/zettacache.rs @@ -12,6 +12,8 @@ use crate::extent_allocator::ExtentAllocatorBuilder; use crate::extent_allocator::ExtentAllocatorPhys; use crate::index::*; use crate::size_histogram::SizeHistogramPhys; +use crate::superblock::PrimaryPhys; +use crate::superblock::SUPERBLOCK_SIZE; use crate::DumpSlabsOptions; use crate::DumpStructuresOptions; use anyhow::Result; @@ -28,7 +30,6 @@ use metered::metered; use metered::time_source::StdInstantMicros; use more_asserts::*; use serde::{Deserialize, Serialize}; -use std::cmp::Ordering; use std::collections::btree_map; use std::collections::BTreeMap; use std::convert::TryFrom; @@ -52,9 +53,7 @@ use util::LockedItem; use util::MutexExt; lazy_static! { - static ref SUPERBLOCK_SIZE: u64 = get_tunable("superblock_size", 4 * 1024); static ref DEFAULT_CHECKPOINT_SIZE_PCT: f64 = get_tunable("default_checkpoint_size_pct", 0.1); - pub static ref DEFAULT_SLAB_SIZE: u32 = get_tunable("default_slab_size", 16 * 1024 * 1024); static ref DEFAULT_METADATA_SIZE_PCT: f64 = get_tunable("default_metadata_size_pct", 15.0); // Can lower this to test forced eviction. static ref MAX_PENDING_CHANGES: usize = get_tunable("max_pending_changes", 50_000); // XXX should be based on RAM usage, ~tens of millions at least static ref CHECKPOINT_INTERVAL: Duration = Duration::from_secs(get_tunable("checkpoint_interval_secs", 60)); @@ -69,65 +68,6 @@ lazy_static! { static ref INDEX_CACHE_ENTRIES_MEM_PCT: usize = get_tunable("index_cache_entries_mem_pct", 10); } -#[derive(Serialize, Deserialize, Debug)] -struct ZettaSuperBlockPhys { - checkpoint_id: CheckpointId, - checkpoint_capacity: Extent, // space available for checkpoints - checkpoint: Extent, // space used by latest checkpoint - slab_size: u32, - disk: DiskId, - num_disks: usize, - guid: u64, - // XXX put sector size in here too and verify it matches what the disk says now? - // XXX put disk size in here so we can detect expansion? -} - -impl ZettaSuperBlockPhys { - async fn read(block_access: &BlockAccess, disk: DiskId) -> Result { - let raw = block_access - .read_raw(Extent::new(disk, 0, *SUPERBLOCK_SIZE)) - .await; - let (this, _): (Self, usize) = block_access.chunk_from_raw(&raw)?; - debug!("got {:#?}", this); - assert_eq!(this.disk, disk); - Ok(this) - } - - async fn read_all(block_access: &BlockAccess) -> Result> { - block_access - .disks() - .map(|disk| ZettaSuperBlockPhys::read(block_access, disk)) - .collect::>() - .try_collect() - .await - } - - async fn write(&self, block_access: &BlockAccess, disk: DiskId) { - maybe_die_with(|| format!("before writing {:#?}", self)); - debug!("writing {:#?}", self); - let raw = block_access.chunk_to_raw(EncodeType::Json, self); - // XXX pad it out to SUPERBLOCK_SIZE? - block_access - .write_raw(DiskLocation { offset: 0, disk }, raw) - .await; - } - - /// Write superblock to all disks. Note that ZettaSuperBlockPhys::disk will - /// be changed to the DiskId of each disk that's written. - async fn write_all(&self, block_access: &BlockAccess) { - block_access - .disks() - .map(|disk| async move { - // Change the DiskId of this superblock to match the disk we're writing to. - let phys = Self { disk, ..*self }; - phys.write(block_access, disk).await - }) - .collect::>() - .for_each(|_| async move {}) - .await; - } -} - #[derive(Serialize, Deserialize, Debug)] struct ZettaCheckpointPhys { generation: CheckpointId, @@ -472,7 +412,9 @@ impl MergeState { struct ZettaCacheState { block_access: Arc, - super_phys: ZettaSuperBlockPhys, + primary: PrimaryPhys, + guid: u64, + primary_disk: DiskId, block_allocator: BlockAllocator, pending_changes: BTreeMap, // Keep state associated with any on-going merge here @@ -537,7 +479,7 @@ impl ZettaCache { } }) .unwrap(), - *SUPERBLOCK_SIZE, + SUPERBLOCK_SIZE, block_access.round_up_to_sector( (*DEFAULT_CHECKPOINT_SIZE_PCT / 100.0 * total_capacity as f64) .approx_as::() @@ -552,7 +494,7 @@ impl ZettaCache { let start = if disk == checkpoint_capacity.location.disk { checkpoint_capacity.location.offset + checkpoint_capacity.size } else { - *SUPERBLOCK_SIZE + SUPERBLOCK_SIZE }; Extent::new( disk, @@ -598,16 +540,13 @@ impl ZettaCache { .write_raw(checkpoint_extent.location, raw) .await; let num_disks = block_access.disks().count(); - ZettaSuperBlockPhys { + PrimaryPhys { checkpoint_id: CheckpointId(0), checkpoint_capacity, checkpoint: checkpoint_extent, - slab_size: *DEFAULT_SLAB_SIZE, - disk: DiskId(0), // will be changed by .write_all() num_disks, - guid, } - .write_all(block_access) + .write_all(DiskId(0), guid, block_access) .await; } @@ -617,49 +556,18 @@ impl ZettaCache { false, )); - let super_blocks = match ZettaSuperBlockPhys::read_all(&block_access).await { - Ok(super_blocks) => super_blocks, + let (primary, primary_disk, guid) = match PrimaryPhys::read(&block_access).await { + Ok(tuple) => tuple, Err(_) => { // XXX need proper create CLI Self::create(&block_access).await; - ZettaSuperBlockPhys::read_all(&block_access).await.unwrap() + PrimaryPhys::read(&block_access).await.unwrap() } }; - let latest_super_block = super_blocks - .into_iter() - .enumerate() - .map(|(id, phys)| { - // XXX proper error handling - // XXX we should be able to reorder them? - assert_eq!(DiskId(id.try_into().unwrap()), phys.disk); - phys - }) - .reduce(|x, y| { - // XXX proper error handling - assert_eq!(x.guid, y.guid); - assert_eq!(x.num_disks, y.num_disks); - match x.checkpoint_id.cmp(&y.checkpoint_id) { - Ordering::Greater => x, - Ordering::Less => y, - Ordering::Equal => { - assert_eq!(x.checkpoint, y.checkpoint); - x - } - } - }) - .unwrap(); + let checkpoint = ZettaCheckpointPhys::read(&block_access, primary.checkpoint).await; - // XXX proper error handling - assert_eq!(paths.len(), latest_super_block.num_disks); - assert!(latest_super_block - .checkpoint_capacity - .contains(&latest_super_block.checkpoint)); - - let checkpoint = - ZettaCheckpointPhys::read(&block_access, latest_super_block.checkpoint).await; - - assert_eq!(checkpoint.generation, latest_super_block.checkpoint_id); + assert_eq!(checkpoint.generation, primary.checkpoint_id); let mut builder = ExtentAllocatorBuilder::new(&checkpoint.extent_allocator); checkpoint.claim(&mut builder); @@ -708,7 +616,9 @@ impl ZettaCache { atime_histogram, size_histogram: checkpoint.size_histogram, operation_log, - super_phys: latest_super_block, + primary, + primary_disk, + guid, outstanding_reads: Default::default(), outstanding_writes: Default::default(), atime: checkpoint.last_atime, @@ -1235,7 +1145,9 @@ impl ZettaCache { pub struct ZCacheDBHandle { block_access: Arc, - superblock: ZettaSuperBlockPhys, + primary: PrimaryPhys, + primary_disk: DiskId, + guid: u64, checkpoint: Arc, extent_allocator: Arc, } @@ -1247,40 +1159,10 @@ impl ZCacheDBHandle { true, )); - let super_blocks = ZettaSuperBlockPhys::read_all(&block_access).await?; - - let superblock = super_blocks - .into_iter() - .enumerate() - .map(|(id, phys)| { - // XXX proper error handling - // XXX we should be able to reorder them? - assert_eq!(DiskId(id.try_into().unwrap()), phys.disk); - phys - }) - .reduce(|x, y| { - // XXX proper error handling - assert_eq!(x.guid, y.guid); - assert_eq!(x.num_disks, y.num_disks); - match x.checkpoint_id.cmp(&y.checkpoint_id) { - Ordering::Greater => x, - Ordering::Less => y, - Ordering::Equal => { - assert_eq!(x.checkpoint, y.checkpoint); - x - } - } - }) - .unwrap(); - - // XXX proper error handling - assert_eq!(paths.len(), superblock.num_disks); - assert!(superblock - .checkpoint_capacity - .contains(&superblock.checkpoint)); + let (primary, primary_disk, guid) = PrimaryPhys::read(&block_access).await?; let checkpoint = - Arc::new(ZettaCheckpointPhys::read(&block_access, superblock.checkpoint).await); + Arc::new(ZettaCheckpointPhys::read(&block_access, primary.checkpoint).await); let mut builder = ExtentAllocatorBuilder::new(&checkpoint.extent_allocator); // We should be able to get away without claiming the metadata space, @@ -1291,37 +1173,26 @@ impl ZCacheDBHandle { Ok(ZCacheDBHandle { block_access, - superblock, + primary, + primary_disk, + guid, checkpoint, extent_allocator, }) } pub async fn dump_free_space(&self) { - println!( - "[{:>6}-{:>6}) Superblock", - nice_p2size(0), - nice_p2size(*SUPERBLOCK_SIZE) - ); - let superblock_len = self - .block_access - .chunk_to_raw(EncodeType::Json, &self.superblock) - .len() as u64; - println!( - " {} used out of {} ({:.1}%)", - nice_p2size(superblock_len), - nice_p2size(*SUPERBLOCK_SIZE), - superblock_len as f64 * 100.0 / *SUPERBLOCK_SIZE as f64 - ); + println!("Superblock"); + println!(" Primary {:?}, GUID: {}", self.primary_disk, self.guid); println!(); println!("Checkpoint Region"); - println!(" {:?}", self.superblock.checkpoint_capacity); + println!(" {:?}", self.primary.checkpoint_capacity); println!( " checkpoint: {} used out of {} ({:.1}%, must be <50%)", - nice_p2size(self.superblock.checkpoint.size), - nice_p2size(self.superblock.checkpoint_capacity.size), - self.superblock.checkpoint.size as f64 * 100.0 - / self.superblock.checkpoint_capacity.size as f64 + nice_p2size(self.primary.checkpoint.size), + nice_p2size(self.primary.checkpoint_capacity.size), + self.primary.checkpoint.size as f64 * 100.0 + / self.primary.checkpoint_capacity.size as f64 ); println!(); println!("Metadata Region"); @@ -1419,7 +1290,7 @@ impl ZCacheDBHandle { pub async fn dump_structures(&self, opts: DumpStructuresOptions) { if opts.dump_defaults { - println!("{:#?}", self.superblock); + println!("{:#?}", self.primary); println!("{:#?}", self.checkpoint); } @@ -1721,7 +1592,7 @@ impl ZettaCacheState { ) { debug!( "flushing checkpoint {:?}", - self.super_phys.checkpoint_id.next() + self.primary.checkpoint_id.next() ); let begin_checkpoint = Instant::now(); @@ -1790,7 +1661,7 @@ impl ZettaCacheState { }); let checkpoint = ZettaCheckpointPhys { - generation: self.super_phys.checkpoint_id.next(), + generation: self.primary.checkpoint_id.next(), extent_allocator: self.extent_allocator.get_phys(), index: index.get_phys(), operation_log: operation_log_phys, @@ -1823,25 +1694,25 @@ impl ZettaCacheState { .chunk_to_raw(EncodeType::Json, &checkpoint); let mut checkpoint_extent = Extent::new( - self.super_phys.checkpoint.location.disk, - self.super_phys.checkpoint.location.offset + self.super_phys.checkpoint.size, + self.primary.checkpoint.location.disk, + self.primary.checkpoint.location.offset + self.primary.checkpoint.size, raw.len() as u64, ); if !self - .super_phys + .primary .checkpoint_capacity .contains(&checkpoint_extent) { // Out of space; go back to the beginning of the checkpoint space. - checkpoint_extent.location.offset = self.super_phys.checkpoint_capacity.location.offset; + checkpoint_extent.location.offset = self.primary.checkpoint_capacity.location.offset; assert!(self - .super_phys + .primary .checkpoint_capacity .contains(&checkpoint_extent)); assert_le!( checkpoint_extent.location.offset + checkpoint_extent.size, - self.super_phys.checkpoint.location.offset + self.primary.checkpoint.location.offset ); // XXX The above assertion could fail if there isn't enough // checkpoint space for 3 checkpoints (the existing one that @@ -1858,15 +1729,17 @@ impl ZettaCacheState { .write_raw(checkpoint_extent.location, raw) .await; - self.super_phys.checkpoint = checkpoint_extent; - self.super_phys.checkpoint_id = self.super_phys.checkpoint_id.next(); - self.super_phys.write_all(&self.block_access).await; + self.primary.checkpoint = checkpoint_extent; + self.primary.checkpoint_id = self.primary.checkpoint_id.next(); + self.primary + .write_all(self.primary_disk, self.guid, &self.block_access) + .await; self.extent_allocator.checkpoint_done(); info!( "completed {:?} in {}ms; flushed {} operations ({}KB) to log", - self.super_phys.checkpoint_id, + self.primary.checkpoint_id, begin_checkpoint.elapsed().as_millis(), operation_log_len, operation_log_bytes / 1024,