diff --git a/.github/workflows/pr-commitlint.yml b/.github/workflows/pr-commitlint.yml index c93aa1000..dcdbd7a6b 100644 --- a/.github/workflows/pr-commitlint.yml +++ b/.github/workflows/pr-commitlint.yml @@ -21,5 +21,14 @@ jobs: if [ ! ${{ github.ref }} = "refs/heads/staging" ]; then first_commit=${{ github.event.pull_request.base.sha }} last_commit=${{ github.event.pull_request.head.sha }} + # Ensure code-review commits don't get merged + sed "s/code-review-rule': \[0/code-review-rule': [2/g" -i commitlint.config.js npx commitlint --from $first_commit --to $last_commit -V + + git log --pretty=format:%s $first_commit..$last_commit > ./subjects + duplicates="$(cat ./subjects | sort | uniq -D)" + if [ "$duplicates" != "" ]; then + echo -e "Duplicate commits found:\n$duplicates" >&2 + exit 1 + fi fi diff --git a/commitlint.config.js b/commitlint.config.js index 19a6eb5a8..3b23cbd01 100644 --- a/commitlint.config.js +++ b/commitlint.config.js @@ -2,10 +2,25 @@ module.exports = { extends: ['@commitlint/config-conventional'], rules: { 'type-enum': [2, 'always', ['build', 'chore', 'ci', 'docs', 'feat', 'fix', 'perf', 'refactor', 'revert', 'style', 'test', 'example']], + 'code-review-rule': [0, 'always'], }, defaultIgnores: false, ignores: [ - (message) => message.startsWith('chore(bors): merge pull request #'), - (message) => message.startsWith('Merge #') - ] + (message) => message.startsWith('chore(bors): merge pull request #'), + (message) => message.startsWith('Merge #') + ], + plugins: [ + { + rules: { + 'code-review-rule': ({subject}) => { + const REVIEW_COMMENTS = `Please don't merge code-review commits, instead squash them in the parent commit`; + if (subject.includes('code-review')) return [ false, REVIEW_COMMENTS ]; + if (subject.includes('review comment')) return [ false, REVIEW_COMMENTS ]; + if (subject.includes('address comment')) return [ false, REVIEW_COMMENTS ]; + if (subject.includes('addressed comment')) return [ false, REVIEW_COMMENTS ]; + return [ true ]; + }, + }, + }, + ], } diff --git a/io-engine-tests/src/lib.rs b/io-engine-tests/src/lib.rs index 9f4996fe2..f3163c71d 100644 --- a/io-engine-tests/src/lib.rs +++ b/io-engine-tests/src/lib.rs @@ -17,7 +17,7 @@ use io_engine::{ core::{MayastorEnvironment, Mthread}, logger, logger::LogFormat, - rebuild::{RebuildJob, RebuildState}, + rebuild::{NexusRebuildJob, RebuildState}, }; pub mod bdev; @@ -457,7 +457,7 @@ pub async fn wait_for_rebuild( timeout: Duration, ) { let (s, r) = unbounded::<()>(); - let job = match RebuildJob::lookup(&dst_uri) { + let job = match NexusRebuildJob::lookup(&dst_uri) { Ok(job) => job, Err(_) => return, }; @@ -490,7 +490,7 @@ pub async fn wait_for_rebuild( error }); reactor_poll!(r); - if let Ok(job) = RebuildJob::lookup(&dst_uri) { + if let Ok(job) = NexusRebuildJob::lookup(&dst_uri) { job.stats().await; } t.join().unwrap().unwrap(); diff --git a/io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs b/io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs index 876c66af0..44e313193 100644 --- a/io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs +++ b/io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs @@ -18,8 +18,8 @@ use crate::{ eventing::{EventMetaGen, EventWithMeta}, rebuild::{ HistoryRecord, + NexusRebuildJob, RebuildError, - RebuildJob, RebuildJobOptions, RebuildState, RebuildStats, @@ -139,8 +139,8 @@ impl<'n> Nexus<'n> { self.reconfigure(DrEvent::ChildRebuild).await; // Stop the I/O log and create a rebuild map from it. - // As this is done after the reconfiguraion, any new write I/Os will - // now reach the destionation child, and no rebuild will be required + // As this is done after the reconfiguration, any new write I/Os will + // now reach the destination child, and no rebuild will be required // for them. let map = self .lookup_child(&dst_child_uri) @@ -186,7 +186,7 @@ impl<'n> Nexus<'n> { verify_mode, }; - RebuildJob::new( + NexusRebuildJob::new( &self.name, src_child_uri, dst_child_uri, @@ -202,7 +202,7 @@ impl<'n> Nexus<'n> { }, ) .await - .and_then(RebuildJob::store) + .and_then(NexusRebuildJob::store) .context(nexus_err::CreateRebuild { child: dst_child_uri.to_owned(), name: self.name.clone(), @@ -211,7 +211,7 @@ impl<'n> Nexus<'n> { /// Translates the job into a new history record and pushes into /// the history. - fn create_history_record(&self, job: Arc) { + fn create_history_record(&self, job: Arc) { let Some(rec) = job.history_record() else { error!("{self:?}: try to get history record on unfinished job"); return; @@ -330,7 +330,7 @@ impl<'n> Nexus<'n> { pub async fn cancel_rebuild_jobs(&self, src_uri: &str) -> Vec { info!("{:?}: cancel rebuild jobs from '{}'...", self, src_uri); - let src_jobs = RebuildJob::lookup_src(src_uri); + let src_jobs = NexusRebuildJob::lookup_src(src_uri); let mut terminated_jobs = Vec::new(); let mut rebuilding_children = Vec::new(); @@ -375,8 +375,8 @@ impl<'n> Nexus<'n> { pub(crate) fn rebuild_job( &self, dst_child_uri: &str, - ) -> Result, Error> { - RebuildJob::lookup(dst_child_uri).map_err(|_| { + ) -> Result, Error> { + NexusRebuildJob::lookup(dst_child_uri).map_err(|_| { Error::RebuildJobNotFound { child: dst_child_uri.to_owned(), name: self.name.to_owned(), @@ -389,9 +389,9 @@ impl<'n> Nexus<'n> { pub(crate) fn rebuild_job_mut( &self, dst_child_uri: &str, - ) -> Result, Error> { + ) -> Result, Error> { let name = self.name.clone(); - RebuildJob::lookup(dst_child_uri).map_err(|_| { + NexusRebuildJob::lookup(dst_child_uri).map_err(|_| { Error::RebuildJobNotFound { child: dst_child_uri.to_owned(), name, diff --git a/io-engine/src/bdev/nexus/nexus_child.rs b/io-engine/src/bdev/nexus/nexus_child.rs index e53227c5d..1b2e0057a 100644 --- a/io-engine/src/bdev/nexus/nexus_child.rs +++ b/io-engine/src/bdev/nexus/nexus_child.rs @@ -29,7 +29,7 @@ use crate::{ VerboseError, }, persistent_store::PersistentStore, - rebuild::{RebuildJob, RebuildMap}, + rebuild::{NexusRebuildJob, RebuildMap}, }; use crate::{ @@ -1199,13 +1199,15 @@ impl<'c> NexusChild<'c> { /// TODO pub(super) fn remove_rebuild_job( &self, - ) -> Option> { - RebuildJob::remove(&self.name).ok() + ) -> Option> { + NexusRebuildJob::remove(&self.name).ok() } /// Return the rebuild job which is rebuilding this child, if rebuilding. - pub(crate) fn rebuild_job(&self) -> Option> { - RebuildJob::lookup(&self.name).ok() + pub(crate) fn rebuild_job( + &self, + ) -> Option> { + NexusRebuildJob::lookup(&self.name).ok() } /// Return the rebuild progress on this child, if rebuilding. diff --git a/io-engine/src/eventing/nexus_events.rs b/io-engine/src/eventing/nexus_events.rs index 93085cff8..81bf1c958 100644 --- a/io-engine/src/eventing/nexus_events.rs +++ b/io-engine/src/eventing/nexus_events.rs @@ -11,10 +11,10 @@ use crate::{ bdev::{nexus, nexus::NexusChild}, core::{MayastorEnvironment, VerboseError}, eventing::{Event, EventMetaGen, EventWithMeta}, - rebuild::{RebuildJob, RebuildState}, + rebuild::{NexusRebuildJob, RebuildState}, }; -impl EventMetaGen for RebuildJob { +impl EventMetaGen for NexusRebuildJob { fn meta(&self) -> EventMeta { let rebuild_status = match self.state() { RebuildState::Init | RebuildState::Running => { diff --git a/io-engine/src/grpc/v0/nexus_grpc.rs b/io-engine/src/grpc/v0/nexus_grpc.rs index 4528d8d54..9e30e67d7 100644 --- a/io-engine/src/grpc/v0/nexus_grpc.rs +++ b/io-engine/src/grpc/v0/nexus_grpc.rs @@ -21,7 +21,7 @@ use crate::{ PtplFileOps, }, core::{Protocol, Share}, - rebuild::RebuildJob, + rebuild::NexusRebuildJob, }; fn map_fault_reason(r: FaultReason) -> ChildStateReason { @@ -137,7 +137,7 @@ impl<'n> Nexus<'n> { } children }, - rebuilds: RebuildJob::count() as u32, + rebuilds: NexusRebuildJob::count() as u32, allowed_hosts: self.allowed_hosts(), } } @@ -165,7 +165,7 @@ impl<'n> Nexus<'n> { } children }, - rebuilds: RebuildJob::count() as u32, + rebuilds: NexusRebuildJob::count() as u32, ana_state: ana_state as i32, allowed_hosts: self.allowed_hosts(), } diff --git a/io-engine/src/rebuild/bdev_rebuild.rs b/io-engine/src/rebuild/bdev_rebuild.rs new file mode 100644 index 000000000..9bdfe2459 --- /dev/null +++ b/io-engine/src/rebuild/bdev_rebuild.rs @@ -0,0 +1,144 @@ +use std::{ + ops::{Deref, Range}, + rc::Rc, +}; + +use super::{ + rebuild_descriptor::RebuildDescriptor, + rebuild_error::RebuildError, + rebuild_job_backend::RebuildBackend, + rebuild_task::{RebuildTasks, TaskResult}, + RebuildJob, + RebuildJobOptions, + SEGMENT_TASKS, +}; + +use crate::gen_rebuild_instances; + +/// A Bdev rebuild job is responsible for managing a rebuild (copy) which reads +/// from source_hdl and writes into destination_hdl from specified start to end. +pub struct BdevRebuildJob(RebuildJob); + +impl std::fmt::Debug for BdevRebuildJob { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} +impl Deref for BdevRebuildJob { + type Target = RebuildJob; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl BdevRebuildJob { + /// Creates a new RebuildJob which rebuilds from source URI to target URI + /// from start to end (of the data partition); notify_fn callback is called + /// when the rebuild state is updated - with the source and destination + /// bdev URI's as arguments. + pub async fn new( + src_uri: &str, + dst_uri: &str, + range: Option>, + options: RebuildJobOptions, + notify_fn: fn(&str, &str) -> (), + ) -> Result { + let descriptor = + RebuildDescriptor::new(src_uri, dst_uri, range, options).await?; + let tasks = RebuildTasks::new(SEGMENT_TASKS, &descriptor)?; + let backend = + BdevRebuildJobBackend::new(tasks, notify_fn, descriptor).await?; + + RebuildJob::from_backend(backend).await.map(Self) + } +} + +gen_rebuild_instances!(BdevRebuildJob); + +/// A rebuild job which is responsible for rebuilding from +/// source to target of the `RebuildDescriptor`. +pub(super) struct BdevRebuildJobBackend { + /// The next block to be rebuilt. + next: u64, + /// A pool of tasks which perform the actual data rebuild. + task_pool: RebuildTasks, + /// A generic rebuild descriptor. + descriptor: Rc, + /// Notification callback with src and dst uri's. + notify_fn: fn(&str, &str) -> (), +} + +#[async_trait::async_trait(?Send)] +impl RebuildBackend for BdevRebuildJobBackend { + fn on_state_change(&mut self) { + (self.notify_fn)(&self.descriptor.src_uri, &self.descriptor.dst_uri); + } + + fn common_desc(&self) -> &RebuildDescriptor { + &self.descriptor + } + + fn task_pool(&self) -> &RebuildTasks { + &self.task_pool + } + + fn schedule_task_by_id(&mut self, id: usize) -> bool { + if self.next >= self.descriptor.range.end { + false + } else { + let next = std::cmp::min( + self.next + self.descriptor.segment_size_blks, + self.descriptor.range.end, + ); + self.task_pool.schedule_segment_rebuild( + id, + self.next, + self.descriptor.clone(), + ); + self.task_pool.active += 1; + self.next = next; + true + } + } + + async fn await_one_task(&mut self) -> Option { + self.task_pool.await_one_task().await + } +} + +impl std::fmt::Debug for BdevRebuildJobBackend { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BdevRebuildJob") + .field("next", &self.next) + .finish() + } +} +impl std::fmt::Display for BdevRebuildJobBackend { + fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Ok(()) + } +} + +impl BdevRebuildJobBackend { + /// Creates a new RebuildJob which rebuilds from source URI to target URI + /// from start to end (of the data partition); notify_fn callback is called + /// when the rebuild state is updated - with the source and destination + /// URI as arguments. + pub async fn new( + task_pool: RebuildTasks, + notify_fn: fn(&str, &str) -> (), + descriptor: RebuildDescriptor, + ) -> Result { + let be = Self { + next: descriptor.range.start, + task_pool, + descriptor: Rc::new(descriptor), + notify_fn, + }; + + info!("{be}: backend created"); + + Ok(be) + } +} diff --git a/io-engine/src/rebuild/mod.rs b/io-engine/src/rebuild/mod.rs index bfc9a73e7..5aff584c4 100644 --- a/io-engine/src/rebuild/mod.rs +++ b/io-engine/src/rebuild/mod.rs @@ -1,5 +1,8 @@ +mod bdev_rebuild; +mod nexus_rebuild; mod rebuild_descriptor; mod rebuild_error; +mod rebuild_instances; mod rebuild_job; mod rebuild_job_backend; mod rebuild_map; @@ -7,21 +10,23 @@ mod rebuild_state; mod rebuild_stats; mod rebuild_task; +pub use bdev_rebuild::BdevRebuildJob; +pub use nexus_rebuild::NexusRebuildJob; use rebuild_descriptor::RebuildDescriptor; pub(crate) use rebuild_error::RebuildError; use rebuild_job::RebuildOperation; pub use rebuild_job::{RebuildJob, RebuildJobOptions, RebuildVerifyMode}; use rebuild_job_backend::{ RebuildFBendChan, - RebuildJobBackend, + RebuildJobBackendManager, RebuildJobRequest, }; -pub(crate) use rebuild_map::RebuildMap; +pub use rebuild_map::RebuildMap; pub use rebuild_state::RebuildState; use rebuild_state::RebuildStates; pub(crate) use rebuild_stats::HistoryRecord; pub use rebuild_stats::RebuildStats; -use rebuild_task::{RebuildTask, RebuildTasks, TaskResult}; +use rebuild_task::{RebuildTasks, TaskResult}; /// Number of concurrent copy tasks per rebuild job const SEGMENT_TASKS: usize = 16; @@ -31,12 +36,12 @@ pub(crate) const SEGMENT_SIZE: u64 = spdk_rs::libspdk::SPDK_BDEV_LARGE_BUF_MAX_SIZE as u64; /// Checks whether a range is contained within another range -trait Within { +trait WithinRange { /// True if `self` is contained within `right`, otherwise false fn within(&self, right: std::ops::Range) -> bool; } -impl Within for std::ops::Range { +impl WithinRange for std::ops::Range { fn within(&self, right: std::ops::Range) -> bool { // also make sure ranges don't overflow self.start < self.end diff --git a/io-engine/src/rebuild/nexus_rebuild.rs b/io-engine/src/rebuild/nexus_rebuild.rs new file mode 100644 index 000000000..8bb1f33f2 --- /dev/null +++ b/io-engine/src/rebuild/nexus_rebuild.rs @@ -0,0 +1,279 @@ +use snafu::ResultExt; +use spdk_rs::LbaRange; +use std::{ + ops::{Deref, Range}, + rc::Rc, +}; + +use crate::{ + core::{DescriptorGuard, UntypedBdev}, + gen_rebuild_instances, + rebuild::{ + rebuild_error::{RangeLockFailed, RangeUnlockFailed}, + rebuild_task::{RebuildTask, RebuildTaskCopier}, + }, +}; + +use super::{ + rebuild_descriptor::RebuildDescriptor, + rebuild_error::{BdevNotFound, RebuildError}, + rebuild_job::RebuildJob, + rebuild_job_backend::RebuildBackend, + rebuild_task::{RebuildTasks, TaskResult}, + RebuildJobOptions, + SEGMENT_TASKS, +}; + +/// A Nexus rebuild job is responsible for managing a rebuild (copy) which reads +/// from source_hdl and writes into destination_hdl from specified start to end. +/// Each copy is synchronized with the nexus bdev using ranged locks to ensure +/// that there is no concurrent between the nexus and the rebuild. +/// This is a frontend interface that communicates with a backend runner which +/// is the one responsible for the read/writing of the data. +pub struct NexusRebuildJob(RebuildJob); + +impl std::fmt::Debug for NexusRebuildJob { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} +impl Deref for NexusRebuildJob { + type Target = RebuildJob; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl NexusRebuildJob { + /// Creates a new RebuildJob which rebuilds from source URI to target URI + /// from start to end (of the data partition); notify_fn callback is called + /// when the rebuild state is updated - with the nexus and destination + /// URI as arguments. + /// todo: Should we use a builder? Example: + /// NexusRebuild: + /// Builder::new(src, srd).with_range().with_options().with_nexus().build() + /// GenericRebuild: + /// Builder::new(src, srd).with_range().with_options().build() + pub async fn new( + nexus_name: &str, + src_uri: &str, + dst_uri: &str, + range: Range, + options: RebuildJobOptions, + notify_fn: fn(String, String) -> (), + ) -> Result { + let descriptor = + RebuildDescriptor::new(src_uri, dst_uri, Some(range), options) + .await?; + let tasks = RebuildTasks::new(SEGMENT_TASKS, &descriptor)?; + + let backend = NexusRebuildJobBackend::new( + nexus_name, tasks, notify_fn, descriptor, + ) + .await?; + + RebuildJob::from_backend(backend).await.map(Self) + } +} + +gen_rebuild_instances!(NexusRebuildJob); + +/// Contains all descriptors and their associated information which allows the +/// tasks to copy/rebuild data from source to destination. +pub(super) struct NexusRebuildDescriptor { + /// Name of the nexus associated with the rebuild job. + pub nexus_name: String, + /// Nexus Descriptor so we can lock its ranges when rebuilding a segment. + pub(super) nexus: DescriptorGuard<()>, + /// The generic rebuild descriptor for copying from source to target. + pub(super) common: RebuildDescriptor, +} +impl Deref for NexusRebuildDescriptor { + type Target = RebuildDescriptor; + + fn deref(&self) -> &Self::Target { + &self.common + } +} + +/// A nexus-specific rebuild job which is responsible for rebuilding +/// the common `RebuildDescriptor` with the addition of the nexus guard +/// as a means of locking the range which is being rebuilt ensuring +/// there are no concurrent writes to the same range between the +/// user IO (through the nexus) and the rebuild itself. +pub(super) struct NexusRebuildJobBackend { + /// The next block to be rebuilt. + next: u64, + /// A pool of tasks which perform the actual data rebuild. + task_pool: RebuildTasks, + /// A nexus rebuild specific descriptor. + descriptor: Rc, + /// Notification callback which existing nexus uses to sync + /// with rebuild updates. + notify_fn: fn(String, String) -> (), +} + +#[async_trait::async_trait(?Send)] +impl RebuildBackend for NexusRebuildJobBackend { + fn on_state_change(&mut self) { + (self.notify_fn)( + self.descriptor.nexus_name.clone(), + self.descriptor.dst_uri.clone(), + ); + } + + fn common_desc(&self) -> &RebuildDescriptor { + &self.descriptor + } + + fn task_pool(&self) -> &RebuildTasks { + &self.task_pool + } + + fn schedule_task_by_id(&mut self, id: usize) -> bool { + match self.send_segment_task(id) { + Some(next) => { + self.task_pool.active += 1; + self.next = next; + true + } + // we've already got enough tasks to rebuild the destination + None => false, + } + } + async fn await_one_task(&mut self) -> Option { + self.task_pool.await_one_task().await + } +} + +impl std::fmt::Debug for NexusRebuildJobBackend { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("NexusRebuildJob") + .field("nexus", &self.descriptor.nexus_name) + .field("next", &self.next) + .finish() + } +} +impl std::fmt::Display for NexusRebuildJobBackend { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "on nexus '{nex}'", nex = self.descriptor.nexus_name) + } +} + +impl NexusRebuildJobBackend { + /// Creates a new RebuildJob which rebuilds from source URI to target URI + /// from start to end (of the data partition); notify_fn callback is called + /// when the rebuild state is updated - with the nexus and destination + /// URI as arguments. + pub async fn new( + nexus_name: &str, + task_pool: RebuildTasks, + notify_fn: fn(String, String) -> (), + descriptor: RebuildDescriptor, + ) -> Result { + let nexus_descriptor = UntypedBdev::open_by_name(nexus_name, false) + .context(BdevNotFound { + bdev: nexus_name.to_string(), + })?; + + let be = Self { + next: descriptor.range.start, + task_pool, + descriptor: Rc::new(NexusRebuildDescriptor { + nexus: nexus_descriptor, + nexus_name: nexus_name.to_string(), + common: descriptor, + }), + notify_fn, + }; + + info!("{be}: backend created"); + + Ok(be) + } + + /// Sends one segment worth of data in a reactor future and notifies the + /// management channel. Returns the next segment offset to rebuild, if any. + fn send_segment_task(&mut self, id: usize) -> Option { + if self.next >= self.descriptor.range.end { + None + } else { + let next = std::cmp::min( + self.next + self.descriptor.segment_size_blks, + self.descriptor.range.end, + ); + self.task_pool.schedule_segment_rebuild( + id, + self.next, + self.descriptor.clone(), + ); + Some(next) + } + } +} + +#[async_trait::async_trait(?Send)] +impl RebuildTaskCopier for NexusRebuildDescriptor { + /// Copies one segment worth of data from source into destination. During + /// this time the LBA range being copied is locked so that there cannot be + /// front end I/O to the same LBA range. + /// + /// # Safety + /// + /// The lock and unlock functions internally reference the RangeContext as a + /// raw pointer, so rust cannot correctly manage its lifetime. The + /// RangeContext MUST NOT be dropped until after the lock and unlock have + /// completed. + /// + /// The use of RangeContext here is safe because it is stored on the stack + /// for the duration of the calls to lock and unlock. + async fn copy_segment( + &self, + blk: u64, + task: &mut RebuildTask, + ) -> Result { + if self.is_blk_sync(blk) { + return Ok(false); + } + + let len = self.get_segment_size_blks(blk); + // The nexus children have metadata and data partitions, whereas the + // nexus has a data partition only. Because we are locking the range on + // the nexus, we need to calculate the offset from the start of the data + // partition. + let r = LbaRange::new(blk - self.range.start, len); + + // Wait for LBA range to be locked. + // This prevents other I/Os being issued to this LBA range whilst it is + // being rebuilt. + let lock = + self.nexus + .lock_lba_range(r) + .await + .context(RangeLockFailed { + blk, + len, + })?; + + // Perform the copy. + let result = task.copy_one(blk, self).await; + + // Wait for the LBA range to be unlocked. + // This allows others I/Os to be issued to this LBA range once again. + self.nexus + .unlock_lba_range(lock) + .await + .context(RangeUnlockFailed { + blk, + len, + })?; + + // In the case of success, mark the segment as already transferred. + if result.is_ok() { + self.blk_synced(blk); + } + + result + } +} diff --git a/io-engine/src/rebuild/rebuild_descriptor.rs b/io-engine/src/rebuild/rebuild_descriptor.rs index f59178543..ebd031998 100644 --- a/io-engine/src/rebuild/rebuild_descriptor.rs +++ b/io-engine/src/rebuild/rebuild_descriptor.rs @@ -1,4 +1,5 @@ use chrono::{DateTime, Utc}; +use snafu::ResultExt; use spdk_rs::{ libspdk::SPDK_NVME_SC_COMPARE_FAILURE, DmaBuf, @@ -7,13 +8,22 @@ use spdk_rs::{ }; use std::sync::Arc; -use crate::core::{ - BlockDeviceDescriptor, - BlockDeviceHandle, - CoreError, - DescriptorGuard, - IoCompletionStatus, - ReadOptions, +use crate::{ + bdev::device_open, + bdev_api::bdev_get_name, + core::{ + BlockDevice, + BlockDeviceDescriptor, + BlockDeviceHandle, + CoreError, + IoCompletionStatus, + ReadOptions, + }, + rebuild::{ + rebuild_error::{BdevInvalidUri, NoCopyBuffer}, + WithinRange, + SEGMENT_SIZE, + }, }; use super::{RebuildError, RebuildJobOptions, RebuildMap, RebuildVerifyMode}; @@ -41,8 +51,6 @@ pub(super) struct RebuildDescriptor { /// Pre-opened descriptor for destination block device. #[allow(clippy::non_send_fields_in_send_ty)] pub(super) dst_descriptor: Box, - /// Nexus Descriptor so we can lock its ranges when rebuilding a segment. - pub(super) nexus_descriptor: DescriptorGuard<()>, /// Start time of this rebuild. pub(super) start_time: DateTime, /// Rebuild map. @@ -50,6 +58,93 @@ pub(super) struct RebuildDescriptor { } impl RebuildDescriptor { + pub(super) async fn new( + src_uri: &str, + dst_uri: &str, + range: Option>, + options: RebuildJobOptions, + ) -> Result { + let src_descriptor = device_open( + &bdev_get_name(src_uri).context(BdevInvalidUri { + uri: src_uri.to_string(), + })?, + false, + ) + .map_err(|e| RebuildError::BdevNotFound { + source: e, + bdev: src_uri.to_string(), + })?; + + let dst_descriptor = device_open( + &bdev_get_name(dst_uri).context(BdevInvalidUri { + uri: dst_uri.to_string(), + })?, + true, + ) + .map_err(|e| RebuildError::BdevNotFound { + source: e, + bdev: dst_uri.to_string(), + })?; + + if src_descriptor.device_name() == dst_descriptor.device_name() { + return Err(RebuildError::SameBdev { + bdev: src_descriptor.device_name(), + }); + } + + let source_hdl = RebuildDescriptor::io_handle(&*src_descriptor).await?; + let destination_hdl = + RebuildDescriptor::io_handle(&*dst_descriptor).await?; + + let range = match range { + None => { + let dst_size = dst_descriptor.get_device().size_in_bytes(); + let dst_blk_size = dst_descriptor.get_device().block_len(); + + 0 .. dst_size / dst_blk_size + } + Some(range) => range, + }; + + if !Self::validate( + source_hdl.get_device(), + destination_hdl.get_device(), + &range, + ) { + return Err(RebuildError::InvalidParameters {}); + } + + let block_size = dst_descriptor.get_device().block_len(); + let segment_size_blks = SEGMENT_SIZE / block_size; + + Ok(Self { + src_uri: src_uri.to_string(), + dst_uri: dst_uri.to_string(), + range, + options, + block_size, + segment_size_blks, + src_descriptor, + dst_descriptor, + start_time: Utc::now(), + rebuild_map: Arc::new(parking_lot::Mutex::new(None)), + }) + } + + /// Check if the source and destination block devices are compatible for + /// rebuild + fn validate( + source: &dyn BlockDevice, + destination: &dyn BlockDevice, + range: &std::ops::Range, + ) -> bool { + // todo: make sure we don't overwrite the labels + let data_partition_start = 0; + range.within(data_partition_start .. source.num_blocks()) + && range.within(data_partition_start .. destination.num_blocks()) + && source.block_len() == destination.block_len() + } + /// Return the size of the segment to be copied. #[inline(always)] pub(super) fn get_segment_size_blks(&self, blk: u64) -> u64 { @@ -60,6 +155,14 @@ impl RebuildDescriptor { self.segment_size_blks } + /// Allocate memory from the memory pool (the mem is zeroed out) + /// with given size and proper alignment for the bdev. + pub(super) fn dma_malloc(&self, size: u64) -> Result { + let src_align = self.src_descriptor.get_device().alignment(); + let dst_align = self.dst_descriptor.get_device().alignment(); + DmaBuf::new(size, src_align.max(dst_align)).context(NoCopyBuffer) + } + /// Get a `BlockDeviceHandle` for the source. #[inline(always)] pub(super) async fn src_io_handle( diff --git a/io-engine/src/rebuild/rebuild_error.rs b/io-engine/src/rebuild/rebuild_error.rs index a8f302ab5..0150b7764 100644 --- a/io-engine/src/rebuild/rebuild_error.rs +++ b/io-engine/src/rebuild/rebuild_error.rs @@ -15,6 +15,10 @@ pub enum RebuildError { NoCopyBuffer { source: DmaError }, #[snafu(display("Failed to validate rebuild job creation parameters"))] InvalidParameters {}, + #[snafu(display( + "The same device was specified for both source and destination: {bdev}" + ))] + SameBdev { bdev: String }, #[snafu(display("Failed to get a handle for bdev {}", bdev))] NoBdevHandle { source: CoreError, bdev: String }, #[snafu(display("Bdev {} not found", bdev))] diff --git a/io-engine/src/rebuild/rebuild_instances.rs b/io-engine/src/rebuild/rebuild_instances.rs new file mode 100644 index 000000000..a452c5fca --- /dev/null +++ b/io-engine/src/rebuild/rebuild_instances.rs @@ -0,0 +1,90 @@ +#[macro_export] +macro_rules! gen_rebuild_instances { + ($T:ty) => { + /// List of rebuild jobs indexed by the destination's replica uri. + type RebuildJobInstances = + std::collections::HashMap>; + + impl $T { + /// Get the rebuild job instances container, we ensure that this can + /// only ever be called on a properly allocated thread + fn get_instances<'a>( + ) -> parking_lot::MutexGuard<'a, RebuildJobInstances> { + assert!( + spdk_rs::Thread::is_spdk_thread(), + "not called from SPDK thread" + ); + + static REBUILD_INSTANCES: once_cell::sync::OnceCell< + parking_lot::Mutex, + > = once_cell::sync::OnceCell::new(); + + REBUILD_INSTANCES + .get_or_init(|| parking_lot::Mutex::new(std::collections::HashMap::new())) + .lock() + } + + /// Returns number of all rebuild jobs of type $T on the system. + pub fn count() -> usize { + Self::get_instances().len() + } + + /// Lookup a rebuild job by its destination uri then remove and drop + /// it. + pub fn remove( + name: &str, + ) -> Result, super::RebuildError> { + match Self::get_instances().remove(name) { + Some(job) => Ok(job), + None => Err(RebuildError::JobNotFound { + job: name.to_owned(), + }), + } + } + + /// Stores a rebuild job in the rebuild job list. + pub fn store(self) -> Result<(), super::RebuildError> { + let mut rebuild_list = Self::get_instances(); + + if rebuild_list.contains_key(&self.dst_uri) { + Err(RebuildError::JobAlreadyExists { + job: self.dst_uri().to_string(), + }) + } else { + let _ = rebuild_list.insert( + self.dst_uri.clone(), + std::sync::Arc::new(self), + ); + Ok(()) + } + } + + /// Lookup a rebuild job by its destination uri and return it. + pub fn lookup( + dst_uri: &str, + ) -> Result, super::RebuildError> { + if let Some(job) = Self::get_instances().get(dst_uri) { + Ok(job.clone()) + } else { + Err(RebuildError::JobNotFound { + job: dst_uri.to_owned(), + }) + } + } + + /// Lookup all rebuilds jobs with name as its source. + pub fn lookup_src(src_uri: &str) -> Vec> { + Self::get_instances() + .iter_mut() + .filter_map(|j| { + if j.1.src_uri() == src_uri { + Some(j.1.clone()) + } else { + None + } + }) + .collect() + } + } + }; +} diff --git a/io-engine/src/rebuild/rebuild_job.rs b/io-engine/src/rebuild/rebuild_job.rs index 4128a9331..c59edbff5 100644 --- a/io-engine/src/rebuild/rebuild_job.rs +++ b/io-engine/src/rebuild/rebuild_job.rs @@ -1,30 +1,28 @@ -use std::{ - collections::HashMap, - ops::Range, - sync::{Arc, Weak}, -}; +use std::sync::{Arc, Weak}; use chrono::Utc; use futures::channel::oneshot; -use once_cell::sync::OnceCell; -use spdk_rs::Thread; use super::{ HistoryRecord, RebuildError, - RebuildJobBackend, + RebuildJobBackendManager, RebuildJobRequest, RebuildMap, RebuildState, RebuildStates, RebuildStats, }; -use crate::core::{Reactors, VerboseError}; +use crate::{ + core::{Reactors, VerboseError}, + rebuild::rebuild_job_backend::RebuildBackend, +}; /// Rebuild I/O verification mode. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub enum RebuildVerifyMode { /// Do not verify rebuild I/Os. + #[default] None, /// Fail rebuild job if I/O verification fails. Fail, @@ -33,7 +31,7 @@ pub enum RebuildVerifyMode { } /// Rebuild job options. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct RebuildJobOptions { pub verify_mode: RebuildVerifyMode, } @@ -71,8 +69,6 @@ impl std::fmt::Display for RebuildOperation { /// is the one responsible for the read/writing of the data. #[derive(Debug)] pub struct RebuildJob { - /// Name of the nexus associated with the rebuild job. - pub nexus_name: String, /// Source URI of the healthy child to rebuild from. src_uri: String, /// Target URI of the out of sync child in need of a rebuild. @@ -88,103 +84,34 @@ pub struct RebuildJob { } impl RebuildJob { - /// Creates a new RebuildJob which rebuilds from source URI to target URI - /// from start to end (of the data partition); notify_fn callback is called - /// when the rebuild state is updated - with the nexus and destination - /// URI as arguments. - pub async fn new( - nexus_name: &str, - src_uri: &str, - dst_uri: &str, - range: Range, - options: RebuildJobOptions, - notify_fn: fn(String, String) -> (), + /// Creates a new RebuildJob taking a specific backend implementation and + /// running the generic backend manager. + pub(super) async fn from_backend( + backend: impl RebuildBackend + 'static, ) -> Result { - // Allocate an instance of the rebuild back-end. - let backend = RebuildJobBackend::new( - nexus_name, + let desc = backend.common_desc(); + let src_uri = desc.src_uri.to_string(); + let dst_uri = desc.dst_uri.to_string(); + let manager = RebuildJobBackendManager::new(backend); + let frontend = Self { src_uri, dst_uri, - range.clone(), - options, - notify_fn, - ) - .await?; - - let frontend = Self { - nexus_name: backend.nexus_name.clone(), - src_uri: backend.src_uri.clone(), - dst_uri: backend.dst_uri.clone(), - states: backend.states.clone(), - comms: RebuildFBendChan::from(&backend.info_chan), - complete_chan: Arc::downgrade(&backend.complete_chan), - notify_chan: backend.notify_chan.1.clone(), + states: manager.states.clone(), + comms: RebuildFBendChan::from(&manager.info_chan), + complete_chan: Arc::downgrade(&manager.complete_chan), + notify_chan: manager.notify_chan.1.clone(), }; // Kick off the rebuild task where it will "live" and await for // commands. - backend.schedule().await; + manager.schedule().await; Ok(frontend) } - /// Returns number of all rebuild jobs on the system. - pub fn count() -> usize { - Self::get_instances().len() - } - - /// Lookup a rebuild job by its destination uri then remove and drop it. - pub fn remove(name: &str) -> Result, RebuildError> { - match Self::get_instances().remove(name) { - Some(job) => Ok(job), - None => Err(RebuildError::JobNotFound { - job: name.to_owned(), - }), - } - } - - /// Stores a rebuild job in the rebuild job list. - pub fn store(self) -> Result<(), RebuildError> { - let mut rebuild_list = Self::get_instances(); - - if rebuild_list.contains_key(&self.dst_uri) { - Err(RebuildError::JobAlreadyExists { - job: self.dst_uri, - }) - } else { - let _ = rebuild_list.insert(self.dst_uri.clone(), Arc::new(self)); - Ok(()) - } - } - - /// Lookup a rebuild job by its destination uri and return it. - pub fn lookup(dst_uri: &str) -> Result, RebuildError> { - if let Some(job) = Self::get_instances().get(dst_uri) { - Ok(job.clone()) - } else { - Err(RebuildError::JobNotFound { - job: dst_uri.to_owned(), - }) - } - } - - /// Lookup all rebuilds jobs with name as its source. - pub fn lookup_src(src_uri: &str) -> Vec> { - Self::get_instances() - .iter_mut() - .filter_map(|j| { - if j.1.src_uri == src_uri { - Some(j.1.clone()) - } else { - None - } - }) - .collect() - } - /// Schedules the job to start in a future and returns a complete channel /// which can be waited on. - pub(crate) async fn start( + pub async fn start( &self, map: Option, ) -> Result, RebuildError> { @@ -324,20 +251,6 @@ impl RebuildJob { self.states.read().final_stats().clone() } - /// Get the rebuild job instances container, we ensure that this can only - /// ever be called on a properly allocated thread - fn get_instances<'a>() -> parking_lot::MutexGuard<'a, RebuildJobInstances> { - assert!(Thread::is_spdk_thread(), "not called from SPDK thread"); - - static REBUILD_INSTANCES: OnceCell< - parking_lot::Mutex, - > = OnceCell::new(); - - REBUILD_INSTANCES - .get_or_init(|| parking_lot::Mutex::new(HashMap::new())) - .lock() - } - /// Client operations are now allowed to skip over previous operations. fn exec_client_op(&self, op: RebuildOperation) -> Result<(), RebuildError> { self.exec_op(op, false) @@ -391,9 +304,6 @@ impl RebuildJob { } } -/// List of rebuild jobs indexed by the destination's replica uri. -type RebuildJobInstances = HashMap>; - #[derive(Debug, Clone)] struct RebuildFBendChan { sender: async_channel::Sender, diff --git a/io-engine/src/rebuild/rebuild_job_backend.rs b/io-engine/src/rebuild/rebuild_job_backend.rs index 2f0600fd6..63a427184 100644 --- a/io-engine/src/rebuild/rebuild_job_backend.rs +++ b/io-engine/src/rebuild/rebuild_job_backend.rs @@ -1,43 +1,20 @@ -use std::{ - fmt::Display, - rc::Rc, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, -}; +use std::sync::Arc; -use chrono::Utc; use crossbeam::channel::{unbounded, Receiver, Sender}; -use futures::{ - channel::{mpsc, oneshot}, - FutureExt, - StreamExt, -}; -use snafu::ResultExt; +use futures::{channel::oneshot, FutureExt, StreamExt}; use super::{ - rebuild_error::{BdevInvalidUri, BdevNotFound, NoCopyBuffer}, RebuildDescriptor, RebuildError, - RebuildJobOptions, RebuildMap, RebuildState, RebuildStates, RebuildStats, - RebuildTask, RebuildTasks, TaskResult, - Within, - SEGMENT_SIZE, - SEGMENT_TASKS, }; -use crate::{ - bdev::device_open, - bdev_api::bdev_get_name, - core::{BlockDevice, Reactors, UntypedBdev}, -}; +use crate::core::Reactors; /// Request between frontend and backend. #[derive(Debug)] @@ -64,11 +41,8 @@ impl RebuildFBendChan { receiver, } } - async fn recv(&mut self) -> Result { - self.receiver - .recv() - .await - .map_err(|_| RebuildError::FrontendGone {}) + async fn recv(&mut self) -> Option { + self.receiver.recv().await.ok() } /// Get a clone of the receive channel. @@ -85,21 +59,33 @@ impl RebuildFBendChan { } } +/// Shared interface for different implementations of the rebuild. +/// A rebuild backend must implement this trait allowing it to +/// be used by the `RebuildJobManager`. +#[async_trait::async_trait(?Send)] +pub(super) trait RebuildBackend: + std::fmt::Debug + std::fmt::Display +{ + /// Callback for rebuild state change notifications. + fn on_state_change(&mut self); + + /// Get a reference to the common rebuild descriptor. + fn common_desc(&self) -> &RebuildDescriptor; + + /// Get a reference to the tasks pool. + fn task_pool(&self) -> &RebuildTasks; + /// Schedule new work on the given task by its id. + /// Returns false if no further work is required. + fn schedule_task_by_id(&mut self, id: usize) -> bool; + /// Wait for the completion of a task and get the result. + /// Each task's completion must be awaited, to ensure that no in-progress IO + /// remains when we complete a rebuild. + async fn await_one_task(&mut self) -> Option; +} + /// A rebuild job is responsible for managing a rebuild (copy) which reads /// from source_hdl and writes into destination_hdl from specified start to end. -pub(super) struct RebuildJobBackend { - /// Name of the nexus associated with the rebuild job. - pub nexus_name: String, - /// Source URI of the healthy child to rebuild from. - pub src_uri: String, - /// Target URI of the out of sync child in need of a rebuild. - pub dst_uri: String, - /// The next block to be rebuilt. - pub(super) next: u64, - /// A pool of tasks which perform the actual data rebuild. - pub(super) task_pool: RebuildTasks, - /// Notification as a `fn` callback. - pub(super) notify_fn: fn(String, String) -> (), +pub(super) struct RebuildJobBackendManager { /// Channel used to signal rebuild update. pub notify_chan: (Sender, Receiver), /// Current state of the rebuild job. @@ -109,188 +95,56 @@ pub(super) struct RebuildJobBackend { Arc>>>, /// Channel to share information between frontend and backend. pub(super) info_chan: RebuildFBendChan, - /// All the rebuild related descriptors. - pub(super) descriptor: Rc, /// Job serial number. serial: u64, + /// The rebuild backend runner which implements the `RebuildBackend` and + /// performs a specific type of rebuild copy. + backend: Box, } -impl std::fmt::Debug for RebuildJobBackend { +impl std::fmt::Debug for RebuildJobBackendManager { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("RebuildJob") - .field("nexus", &self.nexus_name) - .field("source", &self.src_uri) - .field("destination", &self.dst_uri) + .field("backend", &self.backend) .field("serial", &self.serial) .finish() } } -impl Display for RebuildJobBackend { +impl std::fmt::Display for RebuildJobBackendManager { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "Rebuild job #{s} ({state}{done}) '{src}' -> '{dst}' \ - on nexus '{nex}'", + "Rebuild job #{s} ({state}{done}) '{src}' -> '{dst}' {back}", s = self.serial, state = self.state(), done = if self.state().done() { ": done" } else { "" }, - src = self.src_uri, - dst = self.dst_uri, - nex = self.nexus_name + src = self.backend.common_desc().src_uri, + dst = self.backend.common_desc().dst_uri, + back = self.backend, ) } } -impl RebuildJobBackend { +impl RebuildJobBackendManager { /// Creates a new RebuildJob which rebuilds from source URI to target URI /// from start to end (of the data partition); notify_fn callback is called /// when the rebuild state is updated - with the nexus and destination /// URI as arguments. - pub async fn new( - nexus_name: &str, - src_uri: &str, - dst_uri: &str, - range: std::ops::Range, - options: RebuildJobOptions, - notify_fn: fn(String, String) -> (), - ) -> Result { - let src_descriptor = device_open( - &bdev_get_name(src_uri).context(BdevInvalidUri { - uri: src_uri.to_string(), - })?, - false, - ) - .map_err(|e| RebuildError::BdevNotFound { - source: e, - bdev: src_uri.to_string(), - })?; - - let dst_descriptor = device_open( - &bdev_get_name(dst_uri).context(BdevInvalidUri { - uri: dst_uri.to_string(), - })?, - true, - ) - .map_err(|e| RebuildError::BdevNotFound { - source: e, - bdev: dst_uri.to_string(), - })?; - - let source_hdl = RebuildDescriptor::io_handle(&*src_descriptor).await?; - let destination_hdl = - RebuildDescriptor::io_handle(&*dst_descriptor).await?; - - if !Self::validate( - source_hdl.get_device(), - destination_hdl.get_device(), - &range, - ) { - return Err(RebuildError::InvalidParameters {}); - }; - - // validation passed, block size is the same for both - let block_size = destination_hdl.get_device().block_len(); - let segment_size_blks = SEGMENT_SIZE / block_size; - - let mut tasks = RebuildTasks { - tasks: Default::default(), - // only sending one message per channel at a time so we don't need - // the extra buffer - channel: mpsc::channel(0), - active: 0, - total: SEGMENT_TASKS, - segments_done: 0, - segments_transferred: 0, - }; - - for _ in 0 .. tasks.total { - let buffer = destination_hdl - .dma_malloc(segment_size_blks * block_size) - .context(NoCopyBuffer {})?; - - tasks.push(RebuildTask::new(buffer, tasks.channel.0.clone())); - } - - let nexus_descriptor = UntypedBdev::open_by_name(nexus_name, false) - .context(BdevNotFound { - bdev: nexus_name.to_string(), - })?; - - // Job serial numbers. - static SERIAL: AtomicU64 = AtomicU64::new(1); - - let serial = SERIAL.fetch_add(1, Ordering::SeqCst); - + pub fn new(backend: impl RebuildBackend + 'static) -> Self { let be = Self { - nexus_name: nexus_name.to_string(), - src_uri: src_uri.to_string(), - dst_uri: dst_uri.to_string(), - task_pool: tasks, - next: range.start, - notify_fn, notify_chan: unbounded::(), states: Default::default(), complete_chan: Default::default(), info_chan: RebuildFBendChan::new(), - descriptor: Rc::new(RebuildDescriptor { - src_uri: src_uri.to_string(), - dst_uri: dst_uri.to_string(), - range, - options, - block_size, - segment_size_blks, - src_descriptor, - dst_descriptor, - nexus_descriptor, - start_time: Utc::now(), - rebuild_map: Arc::new(parking_lot::Mutex::new(None)), - }), - serial, + serial: 0, + backend: Box::new(backend), }; - info!("{be}: backend created"); - - Ok(be) - } - - /// State of the rebuild job - fn state(&self) -> RebuildState { - self.states.read().current - } - - /// Reply back to the requester with the rebuild statistics. - async fn reply_stats( - &mut self, - requester: oneshot::Sender, - ) -> Result<(), RebuildStats> { - let s = self.stats(); - trace!("{self}: current stats: {s:?}"); - requester.send(s)?; - Ok(()) + be } - /// Sets rebuild map for this job. - async fn set_rebuild_map( - &mut self, - map: RebuildMap, - s: oneshot::Sender<()>, - ) -> Result<(), RebuildError> { - { - let mut g = self.descriptor.rebuild_map.lock(); - if g.is_some() { - error!("{self}: rebuild map is already set"); - } else { - *g = Some(map); - debug!("{self}: set rebuild map"); - } - } - - s.send(()).ok(); - Ok(()) - } - - /// Moves the rebuild job runner and runs until completion. + /// Moves the rebuild job manager and runs until completion. pub(super) async fn schedule(self) { let mut job = self; Reactors::master().send_future(async move { job.run().await }); @@ -301,41 +155,22 @@ impl RebuildJobBackend { async fn run(&mut self) { while !self.reconcile().done() { if !self.state().running() { - match self.info_chan.recv().await { - Ok(RebuildJobRequest::WakeUp) => {} - Ok(RebuildJobRequest::GetStats(reply)) => { - self.reply_stats(reply).await.ok(); - } - Ok(RebuildJobRequest::SetRebuildMap((map, s))) => { - self.set_rebuild_map(map, s).await.ok(); - } - Err(error) => { - self.fail_with(error); - } - } + let message = self.info_chan.recv().await; + self.handle_message(message).await; continue; } + // todo: is there a bug here if we fail above? self.start_all_tasks(); let mut recv = self.info_chan.recv_clone(); - while self.task_pool.running() { + while self.task_pool().running() { futures::select! { - message = recv.next() => match message { - Some(RebuildJobRequest::WakeUp) => { } - Some(RebuildJobRequest::GetStats(reply)) => { - self.reply_stats(reply).await.ok(); - } - Some(RebuildJobRequest::SetRebuildMap((map, s))) => { - self.set_rebuild_map(map, s).await.ok(); - } - None => { - // The frontend is gone (dropped), this should not happen, but let's - // be defensive and simply cancel the rebuild. - self.fail_with(RebuildError::FrontendGone); - self.manage_tasks().await; - break; - } + message = recv.next() => if !self.handle_message(message).await { + // The frontend is gone (dropped), this should not happen, but let's + // be defensive and simply cancel the rebuild. + self.manage_tasks().await; + break; }, _ = self.manage_tasks().fuse() => {}, } @@ -343,73 +178,7 @@ impl RebuildJobBackend { } } - /// Runs the management async task that kicks off N rebuild copy tasks and - /// awaits each completion. When any task completes it kicks off another - /// until the destination is fully rebuilt. - async fn manage_tasks(&mut self) { - while self.task_pool.active > 0 { - match self.await_one_task().await { - Some(r) => match r.error { - None => { - let state = self.states.read().clone(); - match state.pending { - None | Some(RebuildState::Running) => { - self.start_task_by_id(r.id); - } - _ => { - // await all active tasks as we might still have - // ongoing IO. do we need a timeout? - self.await_all_tasks().await; - break; - } - } - } - Some(e) => { - error!( - "{self}: failed to rebuild segment \ - id={sid} block={blk} with error: {e}", - sid = r.id, - blk = r.blk - ); - self.fail_with(e); - self.await_all_tasks().await; - break; - } - }, - None => { - // all senders have disconnected, out of place termination? - self.task_sync_fail(); - break; - } - } - } - } - - /// Calls the job's registered notify fn callback and notify sender channel - fn send_notify(&mut self) { - // should this return a status before we notify the sender channel? - (self.notify_fn)(self.nexus_name.clone(), self.dst_uri.clone()); - if let Err(e) = self.notify_chan.0.send(self.state()) { - error!( - "{self}: failed to send complete via the unbound channel \ - with error: {e}" - ); - } - } - - /// Check if the source and destination block devices are compatible for - /// rebuild - fn validate( - source: &dyn BlockDevice, - destination: &dyn BlockDevice, - range: &std::ops::Range, - ) -> bool { - // todo: make sure we don't overwrite the labels - let data_partition_start = 0; - range.within(data_partition_start .. source.num_blocks()) - && range.within(data_partition_start .. destination.num_blocks()) - && source.block_len() == destination.block_len() - } + /// State Management /// Reconciles the pending state to the current and clear the pending. fn reconcile(&mut self) -> RebuildState { @@ -427,31 +196,73 @@ impl RebuildJobBackend { "{self}: changing state from {old:?} to {new:?}; \ current stats: {s:?}" ); - self.send_notify(); + self.on_state_change(); } new } + /// Calls the job's registered notify fn callback and notify sender channel + fn on_state_change(&mut self) { + self.backend.on_state_change(); + + if let Err(e) = self.notify_chan.0.send(self.state()) { + error!( + "{self}: failed to send complete via the unbound channel \ + with error: {e}" + ); + } + } + + /// State of the rebuild job + fn state(&self) -> RebuildState { + self.states.read().current + } - /// Collects statistics from the job + /// Fails the job, overriding any pending client operation + fn fail(&self) { + self.exec_internal_op(super::RebuildOperation::Fail).ok(); + } + + /// Fails the job, with the given error. + fn fail_with>>(&mut self, error: E) { + self.fail(); + self.states.write().error = error.into(); + } + + /// Completes the job, overriding any pending operation + fn complete(&self) { + self.exec_internal_op(super::RebuildOperation::Complete) + .ok(); + } + + /// Internal operations can bypass previous pending operations. + fn exec_internal_op( + &self, + op: super::RebuildOperation, + ) -> Result { + self.states.write().exec_op(op, true) + } + + /// Generic Rebuild Statistics + + /// Collects generic statistics from the job. pub fn stats(&self) -> RebuildStats { - let blocks_total = - self.descriptor.range.end - self.descriptor.range.start; + let descriptor = self.backend.common_desc(); + let blocks_total = descriptor.range.end - descriptor.range.start; // segment size may not be aligned to the total size let blocks_recovered = std::cmp::min( - self.task_pool.segments_done * self.descriptor.segment_size_blks, + self.task_pool().segments_done * descriptor.segment_size_blks, blocks_total, ); let blocks_transferred = std::cmp::min( - self.task_pool.segments_transferred - * self.descriptor.segment_size_blks, + self.task_pool().segments_transferred + * descriptor.segment_size_blks, blocks_total, ); - let blocks_remaining = self - .descriptor + let blocks_remaining = descriptor .rebuild_map .lock() .as_ref() @@ -463,33 +274,35 @@ impl RebuildJobBackend { assert!(progress < 100 || blocks_remaining == 0); RebuildStats { - start_time: self.descriptor.start_time, - is_partial: self.descriptor.rebuild_map.lock().is_some(), + start_time: descriptor.start_time, + is_partial: descriptor.rebuild_map.lock().is_some(), blocks_total, blocks_recovered, blocks_transferred, blocks_remaining, progress, - blocks_per_task: self.descriptor.segment_size_blks, - block_size: self.descriptor.block_size, - tasks_total: self.task_pool.total as u64, - tasks_active: self.task_pool.active as u64, + blocks_per_task: descriptor.segment_size_blks, + block_size: descriptor.block_size, + tasks_total: self.task_pool().total as u64, + tasks_active: self.task_pool().active as u64, } } - /// Fails the job, overriding any pending client operation - fn fail(&self) { - self.exec_internal_op(super::RebuildOperation::Fail).ok(); + /// Reply back to the requester with the generic rebuild stats. + async fn reply_stats( + &mut self, + requester: oneshot::Sender, + ) -> Result<(), RebuildStats> { + let s = self.stats(); + trace!("{self}: current stats: {s:?}"); + requester.send(s)?; + Ok(()) } - /// Fails the job, with the given error. - fn fail_with>>(&mut self, error: E) { - self.fail(); - self.states.write().error = error.into(); - } + /// Rebuild Tasks Management fn task_sync_fail(&mut self) { - let active = self.task_pool.active; + let active = self.task_pool().active; error!( "{self}: failed to wait for {active} rebuild tasks \ due to task channel failure" @@ -498,78 +311,61 @@ impl RebuildJobBackend { active, }); } - - /// Completes the job, overriding any pending operation - fn complete(&self) { - self.exec_internal_op(super::RebuildOperation::Complete) - .ok(); - } - - /// Internal operations can bypass previous pending operations. - fn exec_internal_op( - &self, - op: super::RebuildOperation, - ) -> Result { - self.states.write().exec_op(op, true) + fn task_pool(&self) -> &RebuildTasks { + self.backend.task_pool() } /// Kicks off all rebuild tasks in the background, or as many as necessary /// to complete the rebuild. fn start_all_tasks(&mut self) { assert_eq!( - self.task_pool.active, 0, + self.task_pool().active, + 0, "{} active tasks", - self.task_pool.active + self.task_pool().active ); - for n in 0 .. self.task_pool.total { + for n in 0 .. self.task_pool().total { if !self.start_task_by_id(n) { break; } } // Nothing to rebuild, in case we paused but the rebuild is complete - if self.task_pool.active == 0 { + if self.task_pool().active == 0 { self.complete(); } let s = self.stats(); - debug!("{self}: started all tasks; current stats: {s:?}"); } /// Tries to kick off a task by its identifier and returns result. /// todo: there's no need to use id's, just use a task from the pool. fn start_task_by_id(&mut self, id: usize) -> bool { - match self.send_segment_task(id) { - Some(next) => { - self.task_pool.active += 1; - self.next = next; - true - } - // we've already got enough tasks to rebuild the destination - None => { - if self.task_pool.active == 0 { - self.complete(); - } - false + if !self.backend.schedule_task_by_id(id) { + if self.task_pool().active == 0 { + self.complete(); } + false + } else { + true } } /// Awaits for one rebuild task to complete and collect the task's result. async fn await_one_task(&mut self) -> Option { - self.task_pool.await_one_task().await + self.backend.await_one_task().await } /// Awaits for all active rebuild tasks to complete. async fn await_all_tasks(&mut self) { debug!( "{self}: awaiting all active tasks ({})", - self.task_pool.active + self.task_pool().active ); - while self.task_pool.active > 0 { + while self.task_pool().active > 0 { if self.await_one_task().await.is_none() { // this should never happen, but just in case.. self.task_sync_fail(); @@ -580,30 +376,99 @@ impl RebuildJobBackend { debug!("{self}: finished awaiting all tasks"); } - /// Sends one segment worth of data in a reactor future and notifies the - /// management channel. Returns the next segment offset to rebuild, if any. - fn send_segment_task(&mut self, id: usize) -> Option { - if self.next >= self.descriptor.range.end { - None - } else { - let blk = self.next; - let next = std::cmp::min( - self.next + self.descriptor.segment_size_blks, - self.descriptor.range.end, - ); + /// Runs the management async task which kicks off N rebuild copy tasks and + /// awaits each completion. + /// When any task completes, it kicks off another until the destination is + /// fully rebuilt. + async fn manage_tasks(&mut self) { + while self.task_pool().active > 0 { + match self.await_one_task().await { + Some(r) => match r.error { + None => { + let state = self.states.read().clone(); + match state.pending { + None | Some(RebuildState::Running) => { + self.start_task_by_id(r.id); + } + _ => { + // await all active tasks as we might still have + // ongoing IO. do we need a timeout? + self.await_all_tasks().await; + break; + } + } + } + Some(e) => { + error!( + "{self}: failed to rebuild segment \ + id={sid} block={blk} with error: {e}", + sid = r.id, + blk = r.blk + ); + self.fail_with(e); + self.await_all_tasks().await; + break; + } + }, + None => { + // all senders have disconnected, out of place termination? + self.task_sync_fail(); + break; + } + } + } + } - self.task_pool - .send_segment(id, blk, self.descriptor.clone()); + /// Handles a request messages replying to it if necessary. + /// Returns false if the message was empty (ie the frontend is gone) + async fn handle_message( + &mut self, + message: Option, + ) -> bool { + match message { + Some(RebuildJobRequest::WakeUp) => {} + Some(RebuildJobRequest::GetStats(reply)) => { + self.reply_stats(reply).await.ok(); + } + Some(RebuildJobRequest::SetRebuildMap((map, s))) => { + self.set_rebuild_map(map, s).await.ok(); + } + None => { + self.fail_with(RebuildError::FrontendGone); + return false; + } + } + true + } - Some(next) + /// Sets rebuild map for this job. + async fn set_rebuild_map( + &mut self, + map: RebuildMap, + s: oneshot::Sender<()>, + ) -> Result<(), RebuildError> { + { + let mut g = self.backend.common_desc().rebuild_map.lock(); + if g.is_some() { + error!("{self}: rebuild map is already set"); + } else { + *g = Some(map); + debug!("{self}: set rebuild map"); + } } + + s.send(()).ok(); + Ok(()) } } -impl Drop for RebuildJobBackend { +impl Drop for RebuildJobBackendManager { fn drop(&mut self) { let stats = self.stats(); info!("{self}: backend dropped; final stats: {stats:?}"); self.states.write().set_final_stats(stats); + for sender in self.complete_chan.lock().drain(..) { + sender.send(self.state()).ok(); + } } } diff --git a/io-engine/src/rebuild/rebuild_map.rs b/io-engine/src/rebuild/rebuild_map.rs index 8f3d628d6..8f0445392 100644 --- a/io-engine/src/rebuild/rebuild_map.rs +++ b/io-engine/src/rebuild/rebuild_map.rs @@ -3,7 +3,7 @@ use std::fmt::{Debug, Formatter}; use crate::core::SegmentMap; /// Map of segments to be rebuilt. -pub(crate) struct RebuildMap { +pub struct RebuildMap { /// Name of the underlying block device. device_name: String, /// Map of device segments. diff --git a/io-engine/src/rebuild/rebuild_task.rs b/io-engine/src/rebuild/rebuild_task.rs index 038a670f6..0dd3e492a 100644 --- a/io-engine/src/rebuild/rebuild_task.rs +++ b/io-engine/src/rebuild/rebuild_task.rs @@ -1,18 +1,16 @@ use futures::{channel::mpsc, stream::FusedStream, SinkExt, StreamExt}; use parking_lot::Mutex; -use snafu::ResultExt; -use spdk_rs::{DmaBuf, LbaRange}; -use std::{rc::Rc, sync::Arc}; -use crate::core::{Reactors, VerboseError}; +use spdk_rs::DmaBuf; +use std::{rc::Rc, sync::Arc}; -use super::{ - rebuild_error::{RangeLockFailed, RangeUnlockFailed}, - RebuildDescriptor, - RebuildError, - RebuildVerifyMode, +use crate::{ + core::{Reactors, VerboseError}, + rebuild::SEGMENT_SIZE, }; +use super::{RebuildDescriptor, RebuildError, RebuildVerifyMode}; + /// Result returned by each segment task worker. /// Used to communicate with the management task indicating that the /// segment task worker is ready to copy another segment. @@ -53,72 +51,9 @@ impl RebuildTask { } } - /// Copies one segment worth of data from source into destination. During - /// this time the LBA range being copied is locked so that there cannot be - /// front end I/O to the same LBA range. - /// - /// # Safety - /// - /// The lock and unlock functions internally reference the RangeContext as a - /// raw pointer, so rust cannot correctly manage its lifetime. The - /// RangeContext MUST NOT be dropped until after the lock and unlock have - /// completed. - /// - /// The use of RangeContext here is safe because it is stored on the stack - /// for the duration of the calls to lock and unlock. - async fn locked_copy_one( - &mut self, - blk: u64, - descriptor: &RebuildDescriptor, - ) -> Result { - if descriptor.is_blk_sync(blk) { - return Ok(false); - } - - let len = descriptor.get_segment_size_blks(blk); - // The nexus children have metadata and data partitions, whereas the - // nexus has a data partition only. Because we are locking the range on - // the nexus, we need to calculate the offset from the start of the data - // partition. - let r = LbaRange::new(blk - descriptor.range.start, len); - - // Wait for LBA range to be locked. - // This prevents other I/Os being issued to this LBA range whilst it is - // being rebuilt. - let lock = descriptor - .nexus_descriptor - .lock_lba_range(r) - .await - .context(RangeLockFailed { - blk, - len, - })?; - - // Perform the copy. - let result = self.copy_one(blk, descriptor).await; - - // Wait for the LBA range to be unlocked. - // This allows others I/Os to be issued to this LBA range once again. - descriptor - .nexus_descriptor - .unlock_lba_range(lock) - .await - .context(RangeUnlockFailed { - blk, - len, - })?; - - // In the case of success, mark the segment as already transferred. - if result.is_ok() { - descriptor.blk_synced(blk); - } - - result - } - /// Copies one segment worth of data from source into destination. /// Returns true if write transfer took place, false otherwise. - async fn copy_one( + pub(super) async fn copy_one( &mut self, offset_blk: u64, desc: &RebuildDescriptor, @@ -173,10 +108,33 @@ impl std::fmt::Debug for RebuildTasks { } impl RebuildTasks { - /// Add the given `RebuildTask` to the task pool. - pub(super) fn push(&mut self, task: RebuildTask) { - self.tasks.push(Arc::new(Mutex::new(task))); + /// Create a rebuild tasks pool for the given rebuild descriptor. + /// Each task can be schedule to run concurrently, and each task + /// gets its own `DmaBuf` from where it reads and writes from. + pub(super) fn new( + task_count: usize, + desc: &RebuildDescriptor, + ) -> Result { + // only sending one message per channel at a time so we don't need + // the extra buffer + let channel = mpsc::channel(0); + let tasks = (0 .. task_count).map(|_| { + let buffer = desc.dma_malloc(SEGMENT_SIZE)?; + let task = RebuildTask::new(buffer, channel.0.clone()); + Ok(Arc::new(Mutex::new(task))) + }); + assert_eq!(tasks.len(), task_count); + + Ok(RebuildTasks { + total: tasks.len(), + tasks: tasks.collect::>()?, + channel, + active: 0, + segments_done: 0, + segments_transferred: 0, + }) } + /// Check if there's at least one task still running. pub(super) fn running(&self) -> bool { self.active > 0 && !self.channel.1.is_terminated() @@ -197,18 +155,21 @@ impl RebuildTasks { /// Schedules the run of a task by its id. It will copy the segment size /// starting at the given block address from source to destination. /// todo: don't use a specific task, simply get the next from the pool. - pub(super) fn send_segment( + pub(super) fn schedule_segment_rebuild( &mut self, id: usize, blk: u64, - descriptor: Rc, + copier: Rc, ) { let task = self.tasks[id].clone(); Reactors::current().send_future(async move { // No other thread/task will acquire the mutex at the same time. let mut task = task.lock(); - let result = task.locked_copy_one(blk, &descriptor).await; + + // Could we make this the option, rather than the descriptor itself? + let result = copier.copy_segment(blk, &mut task).await; + let is_transferred = *result.as_ref().unwrap_or(&false); let error = TaskResult { id, @@ -227,3 +188,43 @@ impl RebuildTasks { }); } } + +/// Interface to allow for different implementations of a single task copy +/// operation. +/// Currently allows for only the copy of a single segment, though this +/// can be expanded for sub-segment copies. +#[async_trait::async_trait(?Send)] +pub(super) trait RebuildTaskCopier { + /// Copies an entire segment at the given block address, from source to + /// target using a `DmaBuf`. + async fn copy_segment( + &self, + blk: u64, + task: &mut RebuildTask, + ) -> Result; +} + +#[async_trait::async_trait(?Send)] +impl RebuildTaskCopier for RebuildDescriptor { + /// Copies one segment worth of data from source into destination. + async fn copy_segment( + &self, + blk: u64, + task: &mut RebuildTask, + ) -> Result { + // todo: move the map out of the descriptor, into the specific backends. + if self.is_blk_sync(blk) { + return Ok(false); + } + + // Perform the copy. + let result = task.copy_one(blk, self).await; + + // In the case of success, mark the segment as already transferred. + if result.is_ok() { + self.blk_synced(blk); + } + + result + } +} diff --git a/io-engine/tests/nexus_rebuild.rs b/io-engine/tests/nexus_rebuild.rs index 2db1207f1..1e289487a 100644 --- a/io-engine/tests/nexus_rebuild.rs +++ b/io-engine/tests/nexus_rebuild.rs @@ -7,11 +7,15 @@ use tracing::error; use io_engine::{ bdev::{device_open, nexus::nexus_lookup_mut}, core::{MayastorCliArgs, Mthread, Protocol}, - rebuild::{RebuildJob, RebuildState, RebuildState::Completed}, + rebuild::{NexusRebuildJob, RebuildState, RebuildState::Completed}, }; pub mod common; use common::{compose::MayastorTest, reactor_poll, wait_for_rebuild}; +use io_engine::{ + bdev::device_create, + rebuild::{BdevRebuildJob, RebuildJobOptions}, +}; // each test `should` use a different nexus name to prevent clashing with // one another. This allows the failed tests to `panic gracefully` improving @@ -217,9 +221,10 @@ async fn rebuild_replica() { .unwrap(); for child in 0 .. NUM_CHILDREN { - RebuildJob::lookup(&get_dev(child)).expect_err("Should not exist"); + NexusRebuildJob::lookup(&get_dev(child)) + .expect_err("Should not exist"); - RebuildJob::lookup_src(&get_dev(child)) + NexusRebuildJob::lookup_src(&get_dev(child)) .iter() .inspect(|&job| { error!( @@ -233,17 +238,17 @@ async fn rebuild_replica() { let _ = nexus.start_rebuild(&get_dev(NUM_CHILDREN)).await; for child in 0 .. NUM_CHILDREN { - RebuildJob::lookup(&get_dev(child)) + NexusRebuildJob::lookup(&get_dev(child)) .expect_err("rebuild job not created yet"); } - let src = RebuildJob::lookup(&get_dev(NUM_CHILDREN)) + let src = NexusRebuildJob::lookup(&get_dev(NUM_CHILDREN)) .expect("now the job should exist") .src_uri() .to_string(); for child in 0 .. NUM_CHILDREN { if get_dev(child) != src { - RebuildJob::lookup_src(&get_dev(child)) + NexusRebuildJob::lookup_src(&get_dev(child)) .iter() .filter(|s| s.dst_uri() != get_dev(child)) .inspect(|&job| { @@ -257,7 +262,7 @@ async fn rebuild_replica() { } assert_eq!( - RebuildJob::lookup_src(&src) + NexusRebuildJob::lookup_src(&src) .iter() .inspect(|&job| { assert_eq!(job.dst_uri(), get_dev(NUM_CHILDREN)); @@ -279,7 +284,7 @@ async fn rebuild_replica() { .pause_rebuild(&get_dev(NUM_CHILDREN)) .await .unwrap(); - assert_eq!(RebuildJob::lookup_src(&src).len(), 1); + assert_eq!(NexusRebuildJob::lookup_src(&src).len(), 1); nexus .as_mut() @@ -287,7 +292,7 @@ async fn rebuild_replica() { .await .unwrap(); let _ = nexus.start_rebuild(&get_dev(NUM_CHILDREN + 1)).await; - assert_eq!(RebuildJob::lookup_src(&src).len(), 2); + assert_eq!(NexusRebuildJob::lookup_src(&src).len(), 2); }) .await; @@ -318,3 +323,32 @@ async fn rebuild_replica() { }) .await; } + +#[tokio::test] +async fn rebuild_bdev() { + test_ini("rebuild_bdev"); + + let ms = get_ms(); + + ms.spawn(async move { + let src_uri = "malloc:///d?size_mb=100"; + let dst_uri = "malloc:///d2?size_mb=100"; + + device_create(src_uri).await.unwrap(); + device_create(dst_uri).await.unwrap(); + + let job = BdevRebuildJob::new( + src_uri, + dst_uri, + None, + RebuildJobOptions::default(), + |_, _| {}, + ) + .await + .unwrap(); + let chan = job.start(None).await.unwrap(); + let state = chan.await.unwrap(); + assert_eq!(state, RebuildState::Completed, "Rebuild should succeed"); + }) + .await; +}