diff --git a/io-engine/src/core/mod.rs b/io-engine/src/core/mod.rs index 962623e3ed..03fb575391 100644 --- a/io-engine/src/core/mod.rs +++ b/io-engine/src/core/mod.rs @@ -98,7 +98,7 @@ mod nic; pub mod partition; mod reactor; pub mod runtime; -pub(crate) mod segment_map; +pub mod segment_map; mod share; pub mod snapshot; pub(crate) mod thread; diff --git a/io-engine/src/core/segment_map.rs b/io-engine/src/core/segment_map.rs index eba3c4f03f..c7d99c16ca 100644 --- a/io-engine/src/core/segment_map.rs +++ b/io-engine/src/core/segment_map.rs @@ -1,4 +1,4 @@ -use bit_vec::BitVec; +use bit_vec::{BitBlock, BitVec}; use std::fmt::{Debug, Formatter}; // Returns ceil of an integer division. @@ -10,10 +10,10 @@ fn div_ceil(a: u64, b: u64) -> u64 { /// It marks every segment as a clean (no need to rebuild, or already /// transferred), or dirty (need to transfer from a healthy device). #[derive(Clone)] -pub(crate) struct SegmentMap { +pub struct SegmentMap { /// Bitmap of rebuild segments of a device. Zeros indicate clean segments, /// ones mark dirty ones. - segments: BitVec, + segments: BitVec, /// Device size in segments. num_segments: u64, /// Device size in blocks. @@ -24,7 +24,7 @@ pub(crate) struct SegmentMap { segment_size: u64, } -impl Debug for SegmentMap { +impl Debug for SegmentMap { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, @@ -37,15 +37,11 @@ impl Debug for SegmentMap { } } -impl SegmentMap { +impl SegmentMap { /// Creates a new segment map with the given parameters. - pub(crate) fn new( - num_blocks: u64, - block_len: u64, - segment_size: u64, - ) -> Self { + pub fn new(num_blocks: u64, block_len: u64, segment_size: u64) -> Self { let num_segments = div_ceil(num_blocks * block_len, segment_size); - let mut segments = BitVec::new(); + let mut segments = BitVec::::default(); segments.grow(num_segments as usize, false); Self { segments, @@ -57,14 +53,14 @@ impl SegmentMap { } /// Merges (bitwise OR) this map with another. - pub(crate) fn merge(mut self, other: &SegmentMap) -> Self { + pub(crate) fn merge(mut self, other: &SegmentMap) -> Self { self.segments.or(&other.segments); self } /// Sets a segment bit corresponding to the given logical block, to the /// given value. - pub(crate) fn set(&mut self, lbn: u64, lbn_cnt: u64, value: bool) { + pub fn set(&mut self, lbn: u64, lbn_cnt: u64, value: bool) { assert_ne!(self.num_blocks, 0); let start_seg = self.lbn_to_seg(lbn); @@ -76,7 +72,7 @@ impl SegmentMap { } /// Returns value of segment bit corresponding to the given logical block. - pub(crate) fn get(&self, lbn: u64) -> Option { + pub fn get(&self, lbn: u64) -> Option { let seg = self.lbn_to_seg(lbn); self.segments.get(seg) } @@ -93,7 +89,7 @@ impl SegmentMap { } /// Counts the total number of dirty blocks. - pub(crate) fn count_dirty_blks(&self) -> u64 { + pub fn count_dirty_blks(&self) -> u64 { self.count_ones() * self.segment_size / self.block_len } @@ -101,6 +97,11 @@ impl SegmentMap { pub(crate) fn segment_size_blks(&self) -> u64 { self.segment_size / self.block_len } + + /// Get the full size reference by the bitmap in blocks. + pub(crate) fn size_blks(&self) -> u64 { + self.num_blocks + } } impl From for BitVec { diff --git a/io-engine/src/rebuild/bdev_rebuild.rs b/io-engine/src/rebuild/bdev_rebuild.rs index f25590863b..23bcbf2a40 100644 --- a/io-engine/src/rebuild/bdev_rebuild.rs +++ b/io-engine/src/rebuild/bdev_rebuild.rs @@ -11,8 +11,9 @@ use super::{ }; use crate::{ + core::SegmentMap, gen_rebuild_instances, - rebuild::rebuilders::{FullRebuild, RangeRebuilder}, + rebuild::rebuilders::{FullRebuild, PartialRebuild, RangeRebuilder}, }; /// A Bdev rebuild job is responsible for managing a rebuild (copy) which reads @@ -32,25 +33,73 @@ impl Deref for BdevRebuildJob { } } -impl BdevRebuildJob { - /// Creates a new RebuildJob which rebuilds from source URI to target URI - /// from start to end (of the data partition); notify_fn callback is called - /// when the rebuild state is updated - with the source and destination - /// bdev URI's as arguments. - pub async fn new( +/// Builder for the `BdevRebuildJob`, allowing for custom bdev to bdev rebuilds. +#[derive(Default)] +pub struct BdevRebuildJobBuilder { + range: Option>, + options: RebuildJobOptions, + notify_fn: Option ()>, + rebuild_map: Option, +} +impl BdevRebuildJobBuilder { + /// Specify a particular range. + pub fn with_range(mut self, range: Range) -> Self { + self.range = Some(range); + self + } + /// Specify the rebuild options. + pub fn with_option(mut self, options: RebuildJobOptions) -> Self { + self.options = options; + self + } + /// Specify a notification function. + pub fn with_notify_fn(mut self, notify_fn: fn(&str, &str) -> ()) -> Self { + self.notify_fn = Some(notify_fn); + self + } + /// Specify a rebuild map, turning it into a partial rebuild. + pub fn with_bitmap(mut self, rebuild_map: SegmentMap) -> Self { + self.rebuild_map = Some(rebuild_map); + self + } + /// Builds a `BdevRebuildJob` which can be started and which will then + /// rebuild from source to destination. + pub async fn build( + self, src_uri: &str, dst_uri: &str, - range: Option>, - options: RebuildJobOptions, - notify_fn: fn(&str, &str) -> (), - ) -> Result { + ) -> Result { let descriptor = - RebuildDescriptor::new(src_uri, dst_uri, range, options).await?; - let tasks = RebuildTasks::new(SEGMENT_TASKS, &descriptor)?; - let backend = - BdevRebuildJobBackend::new(tasks, notify_fn, descriptor).await?; + RebuildDescriptor::new(src_uri, dst_uri, self.range, self.options) + .await?; + let task_pool = RebuildTasks::new(SEGMENT_TASKS, &descriptor)?; + let notify_fn = self.notify_fn.unwrap_or(|_, _| {}); + match self.rebuild_map { + Some(map) => { + descriptor.validate_map(&map)?; + let backend = BdevRebuildJobBackend { + task_pool, + notify_fn, + copier: PartialRebuild::new(map, descriptor), + }; + RebuildJob::from_backend(backend).await.map(BdevRebuildJob) + } + None => { + let backend = BdevRebuildJobBackend { + task_pool, + notify_fn, + copier: FullRebuild::new(descriptor), + }; + RebuildJob::from_backend(backend).await.map(BdevRebuildJob) + } + } + } +} - RebuildJob::from_backend(backend).await.map(Self) +impl BdevRebuildJob { + /// Helps create a `Self` using a builder: `BdevRebuildJobBuilder`. + pub fn builder() -> BdevRebuildJobBuilder { + BdevRebuildJobBuilder::default() } } @@ -58,17 +107,19 @@ gen_rebuild_instances!(BdevRebuildJob); /// A rebuild job which is responsible for rebuilding from /// source to target of the `RebuildDescriptor`. -pub(super) struct BdevRebuildJobBackend { +pub(super) struct BdevRebuildJobBackend> { /// A pool of tasks which perform the actual data rebuild. task_pool: RebuildTasks, /// A generic rebuild descriptor. - copier: FullRebuild, + copier: R, /// Notification callback with src and dst uri's. notify_fn: fn(&str, &str) -> (), } #[async_trait::async_trait(?Send)] -impl RebuildBackend for BdevRebuildJobBackend { +impl> RebuildBackend + for BdevRebuildJobBackend +{ fn on_state_change(&mut self) { let desc = self.common_desc(); (self.notify_fn)(&desc.src_uri, &desc.dst_uri); @@ -110,33 +161,19 @@ impl RebuildBackend for BdevRebuildJobBackend { } } -impl std::fmt::Debug for BdevRebuildJobBackend { +impl> std::fmt::Debug + for BdevRebuildJobBackend +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("BdevRebuildJob") .field("next", &self.copier.peek_next()) .finish() } } -impl std::fmt::Display for BdevRebuildJobBackend { +impl> std::fmt::Display + for BdevRebuildJobBackend +{ fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { Ok(()) } } - -impl BdevRebuildJobBackend { - /// Creates a new RebuildJob which rebuilds from source URI to target URI - /// from start to end (of the data partition); notify_fn callback is called - /// when the rebuild state is updated - with the source and destination - /// URI as arguments. - pub async fn new( - task_pool: RebuildTasks, - notify_fn: fn(&str, &str) -> (), - descriptor: RebuildDescriptor, - ) -> Result { - Ok(Self { - task_pool, - copier: FullRebuild::new(descriptor), - notify_fn, - }) - } -} diff --git a/io-engine/src/rebuild/rebuild_descriptor.rs b/io-engine/src/rebuild/rebuild_descriptor.rs index d4c65ae224..2d1ea8ccb9 100644 --- a/io-engine/src/rebuild/rebuild_descriptor.rs +++ b/io-engine/src/rebuild/rebuild_descriptor.rs @@ -17,6 +17,7 @@ use crate::{ CoreError, IoCompletionStatus, ReadOptions, + SegmentMap, }, rebuild::{ rebuild_error::{BdevInvalidUri, NoCopyBuffer}, @@ -108,7 +109,7 @@ impl RebuildDescriptor { destination_hdl.get_device(), &range, ) { - return Err(RebuildError::InvalidParameters {}); + return Err(RebuildError::InvalidBdevRange {}); } let block_size = dst_descriptor.get_device().block_len(); @@ -128,7 +129,7 @@ impl RebuildDescriptor { } /// Check if the source and destination block devices are compatible for - /// rebuild + /// rebuild. fn validate( source: &dyn BlockDevice, destination: &dyn BlockDevice, @@ -141,6 +142,17 @@ impl RebuildDescriptor { && source.block_len() == destination.block_len() } + /// Check if the rebuild range is compatible with the rebuild segment map. + pub(crate) fn validate_map( + &self, + map: &SegmentMap, + ) -> Result<(), RebuildError> { + if map.size_blks() > self.range.end { + return Err(RebuildError::InvalidMapRange {}); + } + Ok(()) + } + /// Return the size of the segment to be copied. #[inline(always)] pub(super) fn get_segment_size_blks(&self, blk: u64) -> u64 { diff --git a/io-engine/src/rebuild/rebuild_error.rs b/io-engine/src/rebuild/rebuild_error.rs index 0150b7764d..dab384085f 100644 --- a/io-engine/src/rebuild/rebuild_error.rs +++ b/io-engine/src/rebuild/rebuild_error.rs @@ -13,8 +13,10 @@ pub enum RebuildError { JobAlreadyExists { job: String }, #[snafu(display("Failed to allocate buffer for the rebuild copy"))] NoCopyBuffer { source: DmaError }, - #[snafu(display("Failed to validate rebuild job creation parameters"))] - InvalidParameters {}, + #[snafu(display("Block device size is not compatible"))] + InvalidBdevRange {}, + #[snafu(display("Map range is not compatible with block device range"))] + InvalidMapRange {}, #[snafu(display( "The same device was specified for both source and destination: {bdev}" ))] diff --git a/io-engine/src/rebuild/rebuild_job_backend.rs b/io-engine/src/rebuild/rebuild_job_backend.rs index 87973c5c0c..b090fdd5e7 100644 --- a/io-engine/src/rebuild/rebuild_job_backend.rs +++ b/io-engine/src/rebuild/rebuild_job_backend.rs @@ -313,7 +313,12 @@ impl RebuildJobBackendManager { let blocks_remaining = self.backend.blocks_remaining(); let progress = (blocks_recovered * 100) / blocks_total; - assert!(progress < 100 || blocks_remaining == 0); + assert!( + progress < 100 || blocks_remaining == 0, + "progress is {}% but there are {} blocks remaining", + progress, + blocks_remaining + ); RebuildStats { start_time: descriptor.start_time, diff --git a/io-engine/src/rebuild/rebuild_map.rs b/io-engine/src/rebuild/rebuild_map.rs index c9383478a0..b4d5eb80ca 100644 --- a/io-engine/src/rebuild/rebuild_map.rs +++ b/io-engine/src/rebuild/rebuild_map.rs @@ -62,11 +62,6 @@ impl RebuildMap { pub(crate) fn count_dirty_blks(&self) -> u64 { self.segments.count_dirty_blks() } - - /// Get the rebuild map segment size in blocks. - pub(crate) fn segment_size_blks(&self) -> u64 { - self.segments.segment_size_blks() - } } impl From for BitVec { diff --git a/io-engine/src/rebuild/rebuilders.rs b/io-engine/src/rebuild/rebuilders.rs index 259c7ed4ce..7d3f2eae14 100644 --- a/io-engine/src/rebuild/rebuilders.rs +++ b/io-engine/src/rebuild/rebuilders.rs @@ -1,8 +1,11 @@ -use crate::rebuild::{ - rebuild_descriptor::RebuildDescriptor, - rebuild_task::{RebuildTask, RebuildTaskCopier}, - RebuildError, - RebuildMap, +use crate::{ + core::SegmentMap, + rebuild::{ + rebuild_descriptor::RebuildDescriptor, + rebuild_task::{RebuildTask, RebuildTaskCopier}, + RebuildError, + RebuildMap, + }, }; use bit_vec::BitVec; use std::{ops::Range, rc::Rc}; @@ -84,7 +87,7 @@ impl PartialRebuild { /// Create a partial sequential rebuild with the given copier and segment /// map. #[allow(dead_code)] - pub(super) fn new(map: RebuildMap, copier: T) -> Self { + pub(super) fn new(map: SegmentMap, copier: T) -> Self { let total_blks = map.count_dirty_blks(); let segment_size_blks = map.segment_size_blks(); let bit_vec: BitVec = map.into(); diff --git a/io-engine/tests/nexus_rebuild.rs b/io-engine/tests/nexus_rebuild.rs index 30b7ef73a1..cf33f3f040 100644 --- a/io-engine/tests/nexus_rebuild.rs +++ b/io-engine/tests/nexus_rebuild.rs @@ -5,17 +5,18 @@ use once_cell::sync::{Lazy, OnceCell}; use tracing::error; use io_engine::{ - bdev::{device_open, nexus::nexus_lookup_mut}, + bdev::{ + device_create, + device_destroy, + device_open, + nexus::nexus_lookup_mut, + }, core::{MayastorCliArgs, Mthread, Protocol}, - rebuild::{NexusRebuildJob, RebuildState, RebuildState::Completed}, + rebuild::{BdevRebuildJob, NexusRebuildJob, RebuildState}, }; pub mod common; use common::{compose::MayastorTest, reactor_poll, wait_for_rebuild}; -use io_engine::{ - bdev::device_create, - rebuild::{BdevRebuildJob, RebuildJobOptions}, -}; // each test `should` use a different nexus name to prevent clashing with // one another. This allows the failed tests to `panic gracefully` improving @@ -133,7 +134,7 @@ async fn wait_for_replica_rebuild(src_replica: &str, new_replica: &str) { Err(_e) => true, /* Rebuild task completed and was * removed */ // discarded. - Ok(s) => s == Completed, + Ok(s) => s == RebuildState::Completed, } }) .await; @@ -337,18 +338,110 @@ async fn rebuild_bdev() { device_create(src_uri).await.unwrap(); device_create(dst_uri).await.unwrap(); - let job = BdevRebuildJob::new( - src_uri, - dst_uri, - None, - RebuildJobOptions::default(), - |_, _| {}, - ) - .await - .unwrap(); + let job = BdevRebuildJob::builder() + .build(src_uri, dst_uri) + .await + .unwrap(); let chan = job.start().await.unwrap(); let state = chan.await.unwrap(); + // todo: use completion channel with stats rather than just state? + let stats = job.stats().await; + + device_destroy(src_uri).await.unwrap(); + device_destroy(dst_uri).await.unwrap(); + assert_eq!(state, RebuildState::Completed, "Rebuild should succeed"); + assert_eq!(stats.blocks_transferred, 100 * 1024 * 2); + }) + .await; +} + +#[tokio::test] +async fn rebuild_bdev_partial() { + test_ini("rebuild_bdev_partial"); + + let ms = get_ms(); + + use io_engine::core::segment_map::SegmentMap; + + struct PartialMap(SegmentMap); + impl PartialMap { + fn new() -> Self { + let size = 100 * 1024 * 1024; + let seg_size = Self::seg_size(); + let blk_size = Self::blk_size(); + let rebuild_map = + SegmentMap::new(size / blk_size, blk_size, seg_size); + Self(rebuild_map) + } + fn blk_size() -> u64 { + 512 + } + fn seg_size() -> u64 { + 64 * 1024 + } + fn seg_blks() -> u64 { + Self::seg_size() / Self::blk_size() + } + fn seg(self, seg: u64) -> Self { + self.seg_n(seg, 1) + } + fn blk_n(mut self, blk: u64, cnt: u64) -> Self { + assert!(cnt > 0, "Must set something!"); + self.0.set(blk, cnt, true); + self + } + fn seg_n(self, seg: u64, cnt: u64) -> Self { + let seg_size = Self::seg_blks(); + self.blk_n(seg * seg_size, seg_size * cnt) + } + fn build(self) -> SegmentMap { + self.0 + } + } + + ms.spawn(async move { + let src_uri = "malloc:///d?size_mb=100"; + let dst_uri = "malloc:///d2?size_mb=100"; + + device_create(src_uri).await.unwrap(); + device_create(dst_uri).await.unwrap(); + + let rebuild_check = |rebuild_map: SegmentMap, index: usize| async move { + let dirty_blks = rebuild_map.count_dirty_blks(); + let job = BdevRebuildJob::builder() + .with_bitmap(rebuild_map) + .build(src_uri, dst_uri) + .await + .unwrap(); + let chan = job.start().await.unwrap(); + let state = chan.await.unwrap(); + assert_eq!( + state, + RebuildState::Completed, + "Rebuild should succeed" + ); + let stats = job.stats().await; + assert_eq!( + stats.blocks_transferred, dirty_blks, + "Test {} failed", + index + ); + }; + + let test_table = vec![ + PartialMap::new().seg(1).seg(2), + PartialMap::new().seg(1).seg(2).seg(1).seg_n(2, 1), + PartialMap::new().seg(20).seg(3).seg(10), + PartialMap::new().seg(20).seg(3).seg_n(10, 2), + ]; + + for (i, test) in test_table.into_iter().enumerate() { + rebuild_check(test.build(), i).await; + } + + device_destroy(src_uri).await.unwrap(); + device_destroy(dst_uri).await.unwrap(); }) .await; }