Skip to content

Commit

Permalink
Merge pull request openzfs#455 from delphix/projects/merge-upstream/m…
Browse files Browse the repository at this point in the history
…aster

Merge remote-tracking branch '6.0/stage' into 'master'
  • Loading branch information
grwilson authored Jun 1, 2022
2 parents cfd91c2 + be56f82 commit cb212d2
Show file tree
Hide file tree
Showing 10 changed files with 259 additions and 51 deletions.
12 changes: 12 additions & 0 deletions cmd/zfs_object_agent/util/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ pub const TYPE_LIST_DEVICES: &str = "list devices";
pub const TYPE_ZCACHE_IOSTAT: &str = "zcache iostat";
pub const TYPE_ZCACHE_STATS: &str = "zcache stats";
pub const TYPE_ADD_DISK: &str = "add disk";
pub const TYPE_EXPAND_DISK: &str = "expand disk";
pub const TYPE_SYNC_CHECKPOINT: &str = "sync checkpoint";
pub const TYPE_INITIATE_MERGE: &str = "initiate merge";

Expand All @@ -145,5 +146,16 @@ pub struct AddDiskRequest {
pub path: PathBuf,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct ExpandDiskRequest {
pub path: PathBuf,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct ExpandDiskResponse {
pub new_size: u64,
pub additional_bytes: u64,
}

// We assume that a single write of this size is atomic.
pub const SUPERBLOCK_SIZE: usize = 4 * 1024;
52 changes: 52 additions & 0 deletions cmd/zfs_object_agent/zcache/src/expand.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//! `zcache expand` subcommand
use std::path::PathBuf;

use anyhow::Result;
use async_trait::async_trait;
use clap::Parser;
use util::message::ExpandDiskRequest;
use util::message::ExpandDiskResponse;
use util::message::TYPE_EXPAND_DISK;
use util::nice_p2size;
use util::writeln_stdout;

use crate::remote_channel::RemoteChannel;
use crate::subcommand::ZcacheSubCommand;

#[derive(Parser)]
#[clap(about = "Expand a disk in the ZettaCache.")]
pub struct Expand {
path: PathBuf,
}

#[async_trait]
impl ZcacheSubCommand for Expand {
async fn invoke(&self) -> Result<()> {
let mut remote = RemoteChannel::new(true).await?;

let request = ExpandDiskRequest {
path: self.path.clone(),
};

let nvlist = remote
.call(TYPE_EXPAND_DISK, Some(nvpair::to_nvlist(&request).unwrap()))
.await?;
let response: ExpandDiskResponse = nvpair::from_nvlist(&nvlist)?;
if response.additional_bytes > 0 {
writeln_stdout!(
"Disk {:?} expanded, new size {} (added {})",
self.path,
nice_p2size(response.new_size),
nice_p2size(response.additional_bytes)
);
} else {
writeln_stdout!(
"Disk {:?} expansion not needed, size {}",
self.path,
nice_p2size(response.new_size)
);
}
Ok(())
}
}
3 changes: 3 additions & 0 deletions cmd/zfs_object_agent/zcache/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#![deny(clippy::print_stderr)]

mod add;
mod expand;
mod hits;
mod iostat;
mod labelclear;
Expand Down Expand Up @@ -59,6 +60,7 @@ enum Commands {
List(list::List),
Stats(stats::Stats),
Add(add::Add),
Expand(expand::Expand),
Sync(sync::Sync),
Labelclear(labelclear::Labelclear),

Expand All @@ -75,6 +77,7 @@ impl Commands {
Commands::List(list) => list,
Commands::Stats(stats) => stats,
Commands::Add(add) => add,
Commands::Expand(expand) => expand,
Commands::Sync(sync) => sync,
Commands::Labelclear(labelclear) => labelclear,
Commands::ClearHitData(clear_hit_data) => clear_hit_data,
Expand Down
75 changes: 57 additions & 18 deletions cmd/zfs_object_agent/zettacache/src/block_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::os::unix::prelude::OpenOptionsExt;
use std::path::Path;
use std::path::PathBuf;
use std::sync::atomic::Ordering;
use std::sync::Mutex;
use std::sync::RwLock;
use std::thread::sleep;
use std::time::Duration;
Expand All @@ -35,6 +36,7 @@ use tokio::sync::oneshot;
use util::from64::AsUsize;
use util::iter_wrapping;
use util::measure;
use util::message::ExpandDiskResponse;
use util::serde::from_json_slice;
use util::tunable;
use util::with_alloctag;
Expand Down Expand Up @@ -149,7 +151,7 @@ pub struct Disk {

path: PathBuf,
canonical_path: PathBuf,
size: u64,
size: Mutex<u64>,
sector_size: usize,
#[derivative(Debug = "ignore")]
io_stats: &'static DiskIoStats,
Expand Down Expand Up @@ -216,20 +218,7 @@ impl Disk {
.with_context(|| format!("opening disk {path:?}"))?;
// see comment in `struct Disk`
let file = &*Box::leak(Box::new(file));
let stat = nix::sys::stat::fstat(file.as_raw_fd())?;
trace!("stat: {:?}", stat);
let mode = SFlag::from_bits_truncate(stat.st_mode);
let sector_size;
let size;
if mode.contains(SFlag::S_IFBLK) {
size = blkgetsize64(file)?;
sector_size = blksszget(file)?;
} else if mode.contains(SFlag::S_IFREG) {
size = u64::try_from(stat.st_size)?;
sector_size = *MIN_SECTOR_SIZE;
} else {
panic!("{path:?}: invalid file type {mode:?}");
}
let (sector_size, size) = disk_sizes(file)?;

let short_name = path.file_name().unwrap().to_string_lossy().to_string();
let canonical_path = Path::new(path).canonicalize()?;
Expand Down Expand Up @@ -258,7 +247,7 @@ impl Disk {
file,
path: path.to_owned(),
canonical_path,
size,
size: Mutex::new(size),
sector_size,
io_stats,
reader_tx,
Expand Down Expand Up @@ -560,13 +549,46 @@ impl BlockAccess {
Ok(id)
}

// Returns the number of bytes added to the disk.
pub fn expand_disk(&self, disk: DiskId) -> Result<ExpandDiskResponse> {
let disks = self.disks.read().unwrap();
let disk = &disks[disk.index()];
let (_, new_size) = disk_sizes(disk.file)?;
let mut size = disk.size.lock().unwrap();
let additional_bytes = new_size.checked_sub(*size).ok_or_else(|| {
anyhow!(
"{disk:?} {:?} ({:?}) size decreased from {size} to {new_size}",
disk.path,
disk.canonical_path,
)
})?;
*size = new_size;
Ok(ExpandDiskResponse {
additional_bytes,
new_size,
})
}

/// Note: In the future we'll support device removal in which case the
/// DiskId's will probably not be sequential. By using this accessor we
/// need not assume anything about the values inside the DiskId's.
pub fn disks(&self) -> impl Iterator<Item = DiskId> {
(0..self.disks.read().unwrap().len()).map(DiskId::new)
}

pub fn path_to_disk_id(&self, path: &Path) -> Result<DiskId> {
let canonical_path = path.canonicalize()?;
self.disks
.read()
.unwrap()
.iter()
.position(|disk| disk.canonical_path == canonical_path)
.map(DiskId::new)
.ok_or_else(|| {
anyhow!("disk {path:?} ({canonical_path:?}) is not part of the zettacache")
})
}

// Gather a list of devices for zcache list_devices command.
pub fn list_devices(&self) -> DeviceList {
let devices = self
Expand All @@ -576,14 +598,17 @@ impl BlockAccess {
.iter()
.map(|d| DeviceEntry {
name: d.path.clone(),
size: d.size,
size: *d.size.lock().unwrap(),
})
.collect();
DeviceList { devices }
}

pub fn disk_size(&self, disk: DiskId) -> u64 {
self.disks.read().unwrap()[disk.index()].size
*self.disks.read().unwrap()[disk.index()]
.size
.lock()
.unwrap()
}

pub fn disk_extent(&self, disk: DiskId) -> Extent {
Expand Down Expand Up @@ -820,6 +845,20 @@ fn blksszget(file: &File) -> Result<usize> {
Ok(ssz)
}

/// get (sector_size, disk_size), both in bytes
fn disk_sizes(file: &File) -> Result<(usize, u64)> {
let stat = nix::sys::stat::fstat(file.as_raw_fd())?;
trace!("stat: {:?}", stat);
let mode = SFlag::from_bits_truncate(stat.st_mode);
if mode.contains(SFlag::S_IFBLK) {
Ok((blksszget(file)?, blkgetsize64(file)?))
} else if mode.contains(SFlag::S_IFREG) {
Ok((*MIN_SECTOR_SIZE, u64::try_from(stat.st_size)?))
} else {
panic!("{file:?}: invalid file type {mode:?}");
}
}

/// use pread() to read into an aligned vector
fn pread_aligned(file: &File, offset: i64, len: usize, alignment: usize) -> Result<AlignedVec> {
let mut vec = with_alloctag("pread()", || AlignedVec::with_capacity(len, alignment));
Expand Down
39 changes: 39 additions & 0 deletions cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

use anyhow::anyhow;
use anyhow::Result;
use arc_swap::ArcSwapAny;
use arc_swap::ArcSwapOption;
Expand Down Expand Up @@ -48,6 +49,7 @@ use tokio::time::sleep_until;
use util::concurrent_batch::ConcurrentBatch;
use util::lock_non_send;
use util::measure;
use util::message::ExpandDiskResponse;
use util::nice_p2size;
use util::super_trace;
use util::tunable;
Expand Down Expand Up @@ -965,6 +967,13 @@ impl ZettaCache {
}
}

pub async fn expand_disk(&self, path: &Path) -> Result<ExpandDiskResponse> {
match &*self.inner.load() {
Some(inner) => inner.expand_disk(path).await,
None => Err(anyhow!("disk {path:?} is not part of the zettacache")),
}
}

pub async fn initiate_merge(&self) {
if let Some(inner) = &*self.inner.load() {
inner.initiate_merge().await;
Expand Down Expand Up @@ -2065,6 +2074,13 @@ impl Inner {
Ok(())
}

// Returns the amount of additional space, in bytes
async fn expand_disk(&self, path: &Path) -> Result<ExpandDiskResponse> {
let additional_bytes = self.locked.lock().await.expand_disk(path)?;
self.sync_checkpoint().await;
Ok(additional_bytes)
}

async fn initiate_merge(&self) {
self.locked.lock().await.request_merge();
self.sync_checkpoint().await;
Expand Down Expand Up @@ -2827,6 +2843,29 @@ impl Locked {
Ok(())
}

// Returns the amount of additional space, in bytes.
fn expand_disk(&mut self, path: &Path) -> Result<ExpandDiskResponse> {
let disk = self.block_access.path_to_disk_id(path)?;
let response = self.block_access.expand_disk(disk)?;
if response.additional_bytes > 0 {
let phys = self.primary.disks.get_mut(&disk).unwrap();
let expanded_capacity = Extent::new(disk, phys.size, response.additional_bytes);
info!("expanding existing disk {path:?}: {expanded_capacity:?}");
self.slab_allocator.extend(expanded_capacity);

// Update disk size in primary
phys.size = response.new_size;

// The hit data isn't accurate across cache size changes, so clear
// it, which also updates the histogram parameters to reflect the
// new cache size.
self.clear_hit_data();
} else {
info!("{disk:?} ({path:?}) has no expansion capacity");
}
Ok(response)
}

fn request_merge(&mut self) {
self.merge_requested = true;
}
Expand Down
12 changes: 5 additions & 7 deletions cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::time::Instant;
use anyhow::anyhow;
use anyhow::Context;
use anyhow::Result;
use async_stream::stream;
use async_stream::try_stream;
use async_trait::async_trait;
use azure_core::HttpError;
use azure_identity::token_credentials::ImdsManagedIdentityCredential;
Expand Down Expand Up @@ -548,11 +548,11 @@ impl ObjectAccessTrait for BlobObjectAccess {
start_after: Option<String>,
use_delimiter: bool,
list_prefixes: bool,
) -> Pin<Box<dyn Stream<Item = String> + Send + '_>> {
) -> Pin<Box<dyn Stream<Item = Result<String>> + Send + '_>> {
let msg = format!("list {} (after {:?})", prefix, start_after);
let list_prefix = prefix;

let stream_result = stream! {
Box::pin(try_stream! {
let output = retry(&msg, None, || async {
let container_client = self.get_container_client().await;
let list_builder = match use_delimiter {
Expand All @@ -575,7 +575,7 @@ impl ObjectAccessTrait for BlobObjectAccess {
Ok(res) => Ok(res),
}
})
.await.unwrap();
.await?;

// XXX The performance of this is likely to be quite bad. We need a better solution. DOSE-1215
let initial = start_after.unwrap_or("".to_string());
Expand All @@ -594,9 +594,7 @@ impl ObjectAccessTrait for BlobObjectAccess {
}
}
}
};

Box::pin(stream_result)
})
}
}

Expand Down
Loading

0 comments on commit cb212d2

Please sign in to comment.