Skip to content

Commit

Permalink
Merge pull request openzfs#421 from delphix/projects/merge-upstream/m…
Browse files Browse the repository at this point in the history
…aster
  • Loading branch information
Prakash Surya authored May 15, 2022
2 parents 2afefd0 + 9e10ef8 commit 1536a42
Show file tree
Hide file tree
Showing 11 changed files with 289 additions and 43 deletions.
26 changes: 26 additions & 0 deletions cmd/zfs_object_agent/util/src/from64.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
use bytesize::ByteSize;

use crate::tunable::ByteSize32;

/// Conversions that are safe assuming that we are on LP64 (usize == u64)
pub trait From64<A> {
fn from64(a: A) -> Self;
Expand All @@ -8,3 +12,25 @@ impl From64<u64> for usize {
a.try_into().unwrap()
}
}

pub trait AsUsize {
fn as_usize(&self) -> usize;
}

impl AsUsize for ByteSize {
fn as_usize(&self) -> usize {
usize::from64(self.as_u64())
}
}

impl AsUsize for ByteSize32 {
fn as_usize(&self) -> usize {
self.as_u32() as usize
}
}

impl AsUsize for u64 {
fn as_usize(&self) -> usize {
usize::from64(*self)
}
}
2 changes: 1 addition & 1 deletion cmd/zfs_object_agent/util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ mod btreemap_ext;
pub mod concurrent_batch;
mod credentials;
mod die;
mod from64;
pub mod from64;
pub mod lazy_static_ptr;
mod lock_set;
mod logging;
Expand Down
37 changes: 37 additions & 0 deletions cmd/zfs_object_agent/util/src/measure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Mutex;
use std::sync::Once;
use std::time::Instant;

use futures::FutureExt;
use tokio::task::JoinHandle;
Expand All @@ -23,6 +24,7 @@ pub struct Measurement {
count: AtomicU64,
inflight: AtomicU64,
fut_size: AtomicUsize,
nanos: AtomicU64,
}

impl Measurement {
Expand All @@ -34,6 +36,7 @@ impl Measurement {
count: AtomicU64::new(0),
inflight: AtomicU64::new(0),
fut_size: AtomicUsize::new(0),
nanos: AtomicU64::new(0),
}
}

Expand All @@ -55,6 +58,7 @@ impl Measurement {
) -> impl Future<Output = R> + 'b
where
'a: 'b,
R: 'b,
{
if self.fut_size.load(Ordering::Relaxed) == 0 {
// Multiple threads may race to set this, but they will all store the same value, so
Expand All @@ -70,6 +74,22 @@ impl Measurement {
})
}

pub fn fut_timed<'a, 'b, R>(
&'a self,
future: impl Future<Output = R> + 'b,
) -> impl Future<Output = R> + 'b
where
'a: 'b,
R: 'b,
{
let begin = Instant::now();
self.fut(future).inspect(move |_| {
#[allow(clippy::cast_possible_truncation)]
let elapsed = begin.elapsed().as_nanos() as u64;
self.nanos.fetch_add(elapsed, Ordering::Relaxed);
})
}

/// Measure the execution of the provided closure.
pub fn func<F, R>(&self, f: F) -> R
where
Expand All @@ -82,6 +102,19 @@ impl Measurement {
result
}

/// Measure the execution of the provided closure.
pub fn func_timed<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
{
let begin = Instant::now();
let result = self.func(f);
#[allow(clippy::cast_possible_truncation)]
let elapsed = begin.elapsed().as_nanos() as u64;
self.nanos.fetch_add(elapsed, Ordering::Relaxed);
result
}

/// `tokio::spawn()` a new future, and measure its execution. Within zfs_object_agent, this
/// should generally be used instead of plain `tokio::spawn()`.
pub fn spawn<T>(&'static self, future: T) -> JoinHandle<T::Output>
Expand All @@ -106,6 +139,10 @@ impl Display for Measurement {
self.count.load(Ordering::Relaxed),
self.inflight.load(Ordering::Relaxed)
)?;
let nanos = self.nanos.load(Ordering::Relaxed);
if nanos != 0 {
write!(f, ", {}ms total", nanos / 1_000_000)?;
}
let fut_size = self.fut_size.load(Ordering::Relaxed);
if fut_size != 0 {
write!(f, ", fut_size {}B", fut_size)?;
Expand Down
99 changes: 99 additions & 0 deletions cmd/zfs_object_agent/zettacache/src/aggregating_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::sync::Arc;

use tokio::sync::RwLock;
use util::with_alloctag;
use util::zettacache_stats::DiskIoType;
use util::AlignedVec;

use crate::base_types::DiskLocation;
use crate::block_access::BlockAccess;

#[derive(Debug)]
#[must_use]
pub struct AggregatingWriter {
block_access: Arc<BlockAccess>,
capacity: usize,
pending: Option<PendingWrite>,
flushes: Arc<RwLock<()>>,
}

#[derive(Debug)]
struct PendingWrite {
location: DiskLocation,
vec: AlignedVec,
}

impl AggregatingWriter {
pub fn new(block_access: Arc<BlockAccess>, capacity: usize) -> Self {
Self {
block_access,
capacity,
pending: None,
flushes: Default::default(),
}
}

pub fn write(&mut self, location: DiskLocation, data: &[u8]) {
loop {
let pending = match &mut self.pending {
Some(pending) => pending,
None => {
let mut vec = with_alloctag("AggregatingWriter PendingWrite", || {
AlignedVec::with_capacity(
self.capacity,
self.block_access.round_up_to_sector(1),
)
});
vec.extend_from_slice(data);
self.pending = Some(PendingWrite { location, vec });
return;
}
};
if pending.location + pending.vec.len() == location
&& pending.vec.unused_capacity() >= data.len()
{
pending.vec.extend_from_slice(data);
return;
} else {
self.initiate_flush();
}
}
}

fn initiate_flush(&mut self) {
if let Some(pending) = self.pending.take() {
assert!(!pending.vec.is_empty());
let block_access = self.block_access.clone();
// Take the reader lock so that flush() can wait for the spawned task to complete.
// The unwrap is safe because the writer lock is only taken by flush(), which has
// exclusive access to the AggregatingWriter, so we can't be concurrently calling
// initiate_flush().
let guard = self.flushes.clone().try_read_owned().unwrap();
tokio::spawn(async move {
block_access
.write_raw(
pending.location,
pending.vec.into(),
DiskIoType::MaintenanceWrite,
)
.await;
drop(guard);
});
}
}

pub async fn flush(mut self) {
self.initiate_flush();
// Obtain the flushes write lock to wait for all read lock holders (in progress flush
// tasks) to complete and drop the read locks.
self.flushes.write().await;
}
}

impl Drop for AggregatingWriter {
fn drop(&mut self) {
if self.pending.is_some() || self.flushes.try_write().is_err() {
panic!("AggregatingWriter dropped without flushing");
}
}
}
31 changes: 20 additions & 11 deletions cmd/zfs_object_agent/zettacache/src/block_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use serde::Deserialize;
use serde::Serialize;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use util::from64::AsUsize;
use util::iter_wrapping;
use util::measure;
use util::serde::from_json_slice;
Expand All @@ -51,7 +52,10 @@ use crate::base_types::Extent;
tunable! {
static ref MIN_SECTOR_SIZE: usize = 512;
static ref DISK_WRITE_MAX_QUEUE_DEPTH: usize = 32;
static ref DISK_WRITE_MAX_AGGREGATION_SIZE: ByteSize = ByteSize::mib(1);
// Stop aggregating if run would exceed DISK_WRITE_MAX_AGGREGATION_SIZE
pub static ref DISK_WRITE_MAX_AGGREGATION_SIZE: ByteSize = ByteSize::kib(128);
// CHUNK must be > MAX_AGG_SIZE, see Disk::write()
static ref DISK_WRITE_CHUNK: ByteSize = ByteSize::mib(1);
static ref DISK_WRITE_QUEUE_EMPTY_DELAY: Duration = Duration::from_millis(1);
static ref DISK_METADATA_WRITE_MAX_QUEUE_DEPTH: usize = 16;
pub static ref DISK_READ_MAX_QUEUE_DEPTH: usize = 64;
Expand Down Expand Up @@ -275,7 +279,7 @@ impl Disk {
if !readonly {
for rx in writer_rxs {
std::thread::spawn(move || {
Self::aggregating_writer_thread(
Self::writer_thread(
file,
&io_stats.stats[DiskIoType::WriteDataForInsert],
sector_size,
Expand All @@ -285,7 +289,7 @@ impl Disk {
}
for rx in metadata_writer_rxs {
std::thread::spawn(move || {
Self::aggregating_writer_thread(
Self::writer_thread(
file,
&io_stats.stats[DiskIoType::MaintenanceWrite],
sector_size,
Expand Down Expand Up @@ -354,7 +358,7 @@ impl Disk {
async move { measure!().fut(rx).await.unwrap() }
}

fn aggregating_writer_thread(
fn writer_thread(
file: &'static File,
stat_values: &'static IoStatValues,
sector_size: usize,
Expand All @@ -370,6 +374,10 @@ impl Disk {
return (Vec::new(), 0);
};
for (&offset, message) in iter {
if len > 0 && len + message.bytes.len() > DISK_WRITE_MAX_AGGREGATION_SIZE.as_usize()
{
break;
}
if offset == run[0] + len as u64 {
run.push(offset);
len += message.bytes.len();
Expand Down Expand Up @@ -489,13 +497,14 @@ impl Disk {
_ => panic!("invalid {:?} for write", io_type),
};
// Dispatch this write to a writer thread, determined based on its offset. The first
// DISK_WRITE_MAX_AGGREGATION_SIZE (default 1MB) of the disk goes to the first thread,
// the second chunk to the second thread, and so on, wrapping back around to the first
// thread. Note that each block allocator slab (16MB) is mapped to multiple threads, so
// the work is distributed to multiple threads even when it's concentrated among a small
// number of slabs.
let writer =
usize::from64(offset / DISK_WRITE_MAX_AGGREGATION_SIZE.as_u64() % txs.len() as u64);
// DISK_WRITE_CHUNK (default 1MB) of the disk goes to the first thread, the second chunk
// to the second thread, and so on, wrapping back around to the first thread. Note that
// each block allocator slab (32MB) is mapped to multiple threads, so the work is
// distributed to multiple threads even when it's concentrated among a small number of
// slabs. The CHUNK (1MB) is larger than the DISK_WRITE_MAX_AGGREGATION_SIZE (128KB) so
// that we can find aggregations that cross MAX_AGG_SIZE boundaries (e.g. from offsets
// 100KB to 228KB).
let writer = usize::from64(offset / DISK_WRITE_CHUNK.as_u64() % txs.len() as u64);
txs[writer]
.send(message)
.unwrap_or_else(|e| panic!("writer_txs[{}].send: {}", writer, e));
Expand Down
23 changes: 14 additions & 9 deletions cmd/zfs_object_agent/zettacache/src/block_based_log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use std::sync::Arc;
use bytesize::ByteSize;
use derivative::Derivative;
use futures::stream;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use futures_core::Stream;
use log::*;
Expand All @@ -21,15 +20,18 @@ use serde::de::DeserializeOwned;
use serde::Deserialize;
use serde::Serialize;
use tokio_stream::wrappers::ReceiverStream;
use util::from64::AsUsize;
use util::measure;
use util::tunable;
use util::with_alloctag;
use util::zettacache_stats::DiskIoType;
use util::From64;

use crate::aggregating_writer::AggregatingWriter;
use crate::base_types::*;
use crate::block_access::BlockAccess;
use crate::block_access::EncodeType;
use crate::block_access::DISK_WRITE_MAX_AGGREGATION_SIZE;
use crate::slab_allocator::SlabAccess;
use crate::slab_allocator::SlabAllocator;
use crate::slab_allocator::SlabAllocatorBuilder;
Expand Down Expand Up @@ -182,7 +184,7 @@ impl<T: BlockBasedLogEntry> BlockBasedLogPhys<T> {
.map(|(_, extent)| extent)
.collect::<Vec<_>>();

// Just buffer a single (16MB) extent between the two tasks.
// Just buffer a single (32MB) extent between the two tasks.
let (extent_tx, mut extent_rx) = tokio::sync::mpsc::channel(1);

{
Expand Down Expand Up @@ -322,7 +324,12 @@ impl<T: BlockBasedLogEntry> BlockBasedLog<T> {
where
F: FnMut(ChunkId, LogOffset, T),
{
let writes_stream = FuturesUnordered::new();
// We aggregate more than the BlockAccess layer's MAX_AGGREGATE_SIZE, so that it will not
// attempt to aggregate our writes.
let mut writer = AggregatingWriter::new(
self.block_access.clone(),
2 * DISK_WRITE_MAX_AGGREGATION_SIZE.as_usize(),
);

let mut remaining_entries = self.pending_entries.as_slice();
let mut max_entries = None;
Expand Down Expand Up @@ -376,17 +383,15 @@ impl<T: BlockBasedLogEntry> BlockBasedLog<T> {
self.phys.next_chunk = self.phys.next_chunk.next();
self.phys.next_chunk_offset.0 += raw_size;

writes_stream.push(self.block_access.write_raw(
extent.location,
raw_chunk,
DiskIoType::MaintenanceWrite,
));
writer.write(extent.location, &raw_chunk);

// head is consumed
remaining_entries = tail;
}

writes_stream.count().await;
measure!("BlockBasedLog::flush_impl() AggregatingWriter::flush()")
.fut_timed(writer.flush())
.await;
self.pending_entries.truncate(0);
}

Expand Down
1 change: 1 addition & 0 deletions cmd/zfs_object_agent/zettacache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#![deny(clippy::print_stdout)]
#![deny(clippy::print_stderr)]

mod aggregating_writer;
mod atime_histogram;
pub mod base_types;
mod block_access;
Expand Down
Loading

0 comments on commit 1536a42

Please sign in to comment.