Skip to content

Commit

Permalink
Primary Superblock (openzfs#64)
Browse files Browse the repository at this point in the history
Change the way we store the superblock on multiple disks. Now, one of the disks is designated as the "primary", which has the single source of truth about cache-wide info (checkpoint ID, checkpoint location, etc). The other disks only have identifying information (disk ID, cache ID).

This reorganization should make it simpler to add disks.

Also, move the superblock handling code to its own file.
  • Loading branch information
ahrens authored Dec 13, 2021
1 parent 8c900fe commit dbc6af6
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 175 deletions.
2 changes: 1 addition & 1 deletion cmd/zfs_object_agent/zettacache/src/block_allocator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::block_access::BlockAccess;
use crate::extent_allocator::{ExtentAllocator, ExtentAllocatorBuilder};
use crate::space_map::{SpaceMap, SpaceMapEntry, SpaceMapPhys};
use crate::zettacache::DEFAULT_SLAB_SIZE;
use crate::{base_types::*, DumpSlabsOptions};
use bimap::BiBTreeMap;
use lazy_static::lazy_static;
Expand All @@ -23,6 +22,7 @@ use util::{get_tunable, TerseVec};
use util::{nice_p2size, From64};

lazy_static! {
static ref DEFAULT_SLAB_SIZE: u32 = get_tunable("default_slab_size", 16 * 1024 * 1024);
static ref DEFAULT_SLAB_BUCKETS: SlabAllocationBucketsPhys =
get_tunable("default_slab_buckets", SlabAllocationBucketsPhys::default());
static ref SLAB_CONDENSE_PER_CHECKPOINT: u64 = get_tunable("slab_condense_per_checkpoint", 10);
Expand Down
1 change: 1 addition & 0 deletions cmd/zfs_object_agent/zettacache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod extent_allocator;
mod index;
mod size_histogram;
mod space_map;
mod superblock;
mod zcachedb;
mod zettacache;

Expand Down
115 changes: 115 additions & 0 deletions cmd/zfs_object_agent/zettacache/src/superblock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use crate::base_types::*;
use crate::block_access::*;
use anyhow::anyhow;
use anyhow::Result;
use futures::stream::*;
use log::*;
use serde::{Deserialize, Serialize};
use util::maybe_die_with;

pub const SUPERBLOCK_SIZE: u64 = 4 * 1024;

/// State stored at the beginning of every disk
#[derive(Serialize, Deserialize, Debug, Clone)]
struct SuperblockPhys {
primary: Option<PrimaryPhys>,
disk: DiskId,
guid: u64,
// XXX put sector size in here too and verify it matches what the disk says now?
// XXX put disk size in here so we can detect expansion?
}

/// State that's only needed on the primary disk (currently, always DiskId(0)).
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct PrimaryPhys {
pub checkpoint_id: CheckpointId,
pub checkpoint_capacity: Extent, // space available for checkpoints
pub checkpoint: Extent, // space used by latest checkpoint
pub num_disks: usize,
}

impl PrimaryPhys {
/// Write superblocks to all disks.
pub async fn write_all(&self, primary_disk: DiskId, guid: u64, block_access: &BlockAccess) {
block_access
.disks()
.map(|disk| async move {
// Change the DiskId of this superblock to match the disk we're writing to.
let phys = if disk == primary_disk {
SuperblockPhys {
primary: Some(self.clone()),
disk,
guid,
}
} else {
SuperblockPhys {
primary: None,
disk,
guid,
}
};
phys.write(block_access, disk).await;
})
.collect::<FuturesUnordered<_>>()
.for_each(|_| async move {})
.await;
}

pub async fn read(block_access: &BlockAccess) -> Result<(Self, DiskId, u64)> {
let superblocks = SuperblockPhys::read_all(block_access).await?;

let (primary, primary_disk, guid) = superblocks
.iter()
.find_map(|phys| {
phys.primary
.as_ref()
.map(|primary| (primary.clone(), phys.disk, phys.guid))
})
.ok_or_else(|| anyhow!("Primary Superblock not found"))?;

for (id, phys) in superblocks.iter().enumerate() {
// XXX proper error handling
// XXX we should be able to reorder them?
assert_eq!(DiskId(id.try_into().unwrap()), phys.disk);
assert_eq!(phys.guid, guid);
assert!(phys.primary.is_none() || phys.disk == primary_disk);
}

// XXX proper error handling
assert_eq!(block_access.disks().count(), primary.num_disks);
assert!(primary.checkpoint_capacity.contains(&primary.checkpoint));

Ok((primary, primary_disk, guid))
}
}

impl SuperblockPhys {
async fn read(block_access: &BlockAccess, disk: DiskId) -> Result<SuperblockPhys> {
let raw = block_access
.read_raw(Extent::new(disk, 0, SUPERBLOCK_SIZE))
.await;
let (this, _): (Self, usize) = block_access.chunk_from_raw(&raw)?;
debug!("got {:#?}", this);
assert_eq!(this.disk, disk);
Ok(this)
}

async fn read_all(block_access: &BlockAccess) -> Result<Vec<SuperblockPhys>> {
block_access
.disks()
.map(|disk| SuperblockPhys::read(block_access, disk))
.collect::<FuturesOrdered<_>>()
.try_collect()
.await
}

async fn write(&self, block_access: &BlockAccess, disk: DiskId) {
maybe_die_with(|| format!("before writing {:#?}", self));
debug!("writing {:#?}", self);
let raw = block_access.chunk_to_raw(EncodeType::Json, self);
// XXX pad it out to SUPERBLOCK_SIZE?
block_access
.write_raw(DiskLocation { offset: 0, disk }, raw)
.await;
}
}
Loading

0 comments on commit dbc6af6

Please sign in to comment.