Skip to content

Commit

Permalink
refactor(rebuild/bdev): use builder pattern
Browse files Browse the repository at this point in the history
Allows for either full or partial rebuild from bdev to bdev.
Adds tests ensuring both full and partial rebuild are correct.

Signed-off-by: Tiago Castro <[email protected]>
  • Loading branch information
tiagolobocastro committed Mar 11, 2024
1 parent 316c7cd commit 7c42799
Show file tree
Hide file tree
Showing 9 changed files with 235 additions and 87 deletions.
2 changes: 1 addition & 1 deletion io-engine/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ mod nic;
pub mod partition;
mod reactor;
pub mod runtime;
pub(crate) mod segment_map;
pub mod segment_map;
mod share;
pub mod snapshot;
pub(crate) mod thread;
Expand Down
31 changes: 16 additions & 15 deletions io-engine/src/core/segment_map.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use bit_vec::BitVec;
use bit_vec::{BitBlock, BitVec};
use std::fmt::{Debug, Formatter};

// Returns ceil of an integer division.
Expand All @@ -10,10 +10,10 @@ fn div_ceil(a: u64, b: u64) -> u64 {
/// It marks every segment as a clean (no need to rebuild, or already
/// transferred), or dirty (need to transfer from a healthy device).
#[derive(Clone)]
pub(crate) struct SegmentMap {
pub struct SegmentMap<B: BitBlock = u32> {
/// Bitmap of rebuild segments of a device. Zeros indicate clean segments,
/// ones mark dirty ones.
segments: BitVec,
segments: BitVec<B>,
/// Device size in segments.
num_segments: u64,
/// Device size in blocks.
Expand All @@ -24,7 +24,7 @@ pub(crate) struct SegmentMap {
segment_size: u64,
}

impl Debug for SegmentMap {
impl<B: BitBlock> Debug for SegmentMap<B> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
Expand All @@ -37,15 +37,11 @@ impl Debug for SegmentMap {
}
}

impl SegmentMap {
impl<B: BitBlock> SegmentMap<B> {
/// Creates a new segment map with the given parameters.
pub(crate) fn new(
num_blocks: u64,
block_len: u64,
segment_size: u64,
) -> Self {
pub fn new(num_blocks: u64, block_len: u64, segment_size: u64) -> Self {
let num_segments = div_ceil(num_blocks * block_len, segment_size);
let mut segments = BitVec::new();
let mut segments = BitVec::<B>::default();
segments.grow(num_segments as usize, false);
Self {
segments,
Expand All @@ -57,14 +53,14 @@ impl SegmentMap {
}

/// Merges (bitwise OR) this map with another.
pub(crate) fn merge(mut self, other: &SegmentMap) -> Self {
pub(crate) fn merge(mut self, other: &SegmentMap<B>) -> Self {
self.segments.or(&other.segments);
self
}

/// Sets a segment bit corresponding to the given logical block, to the
/// given value.
pub(crate) fn set(&mut self, lbn: u64, lbn_cnt: u64, value: bool) {
pub fn set(&mut self, lbn: u64, lbn_cnt: u64, value: bool) {
assert_ne!(self.num_blocks, 0);

let start_seg = self.lbn_to_seg(lbn);
Expand All @@ -76,7 +72,7 @@ impl SegmentMap {
}

/// Returns value of segment bit corresponding to the given logical block.
pub(crate) fn get(&self, lbn: u64) -> Option<bool> {
pub fn get(&self, lbn: u64) -> Option<bool> {
let seg = self.lbn_to_seg(lbn);
self.segments.get(seg)
}
Expand All @@ -93,14 +89,19 @@ impl SegmentMap {
}

/// Counts the total number of dirty blocks.
pub(crate) fn count_dirty_blks(&self) -> u64 {
pub fn count_dirty_blks(&self) -> u64 {
self.count_ones() * self.segment_size / self.block_len
}

/// Get the segment size in blocks.
pub(crate) fn segment_size_blks(&self) -> u64 {
self.segment_size / self.block_len
}

/// Get the full size reference by the bitmap in blocks.
pub(crate) fn size_blks(&self) -> u64 {
self.num_blocks
}
}

impl From<SegmentMap> for BitVec {
Expand Down
115 changes: 76 additions & 39 deletions io-engine/src/rebuild/bdev_rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ use super::{
};

use crate::{
core::SegmentMap,
gen_rebuild_instances,
rebuild::rebuilders::{FullRebuild, RangeRebuilder},
rebuild::rebuilders::{FullRebuild, PartialRebuild, RangeRebuilder},
};

/// A Bdev rebuild job is responsible for managing a rebuild (copy) which reads
Expand All @@ -32,43 +33,93 @@ impl Deref for BdevRebuildJob {
}
}

impl BdevRebuildJob {
/// Creates a new RebuildJob which rebuilds from source URI to target URI
/// from start to end (of the data partition); notify_fn callback is called
/// when the rebuild state is updated - with the source and destination
/// bdev URI's as arguments.
pub async fn new(
/// Builder for the `BdevRebuildJob`, allowing for custom bdev to bdev rebuilds.
#[derive(Default)]
pub struct BdevRebuildJobBuilder {
range: Option<Range<u64>>,
options: RebuildJobOptions,
notify_fn: Option<fn(&str, &str) -> ()>,
rebuild_map: Option<SegmentMap>,
}
impl BdevRebuildJobBuilder {
/// Specify a particular range.
pub fn with_range(mut self, range: Range<u64>) -> Self {
self.range = Some(range);
self
}
/// Specify the rebuild options.
pub fn with_option(mut self, options: RebuildJobOptions) -> Self {
self.options = options;
self
}
/// Specify a notification function.
pub fn with_notify_fn(mut self, notify_fn: fn(&str, &str) -> ()) -> Self {
self.notify_fn = Some(notify_fn);
self
}
/// Specify a rebuild map, turning it into a partial rebuild.
pub fn with_bitmap(mut self, rebuild_map: SegmentMap) -> Self {
self.rebuild_map = Some(rebuild_map);
self
}
/// Builds a `BdevRebuildJob` which can be started and which will then
/// rebuild from source to destination.
pub async fn build(
self,
src_uri: &str,
dst_uri: &str,
range: Option<Range<u64>>,
options: RebuildJobOptions,
notify_fn: fn(&str, &str) -> (),
) -> Result<Self, RebuildError> {
) -> Result<BdevRebuildJob, RebuildError> {
let descriptor =
RebuildDescriptor::new(src_uri, dst_uri, range, options).await?;
let tasks = RebuildTasks::new(SEGMENT_TASKS, &descriptor)?;
let backend =
BdevRebuildJobBackend::new(tasks, notify_fn, descriptor).await?;
RebuildDescriptor::new(src_uri, dst_uri, self.range, self.options)
.await?;
let task_pool = RebuildTasks::new(SEGMENT_TASKS, &descriptor)?;
let notify_fn = self.notify_fn.unwrap_or(|_, _| {});
match self.rebuild_map {
Some(map) => {
descriptor.validate_map(&map)?;
let backend = BdevRebuildJobBackend {
task_pool,
notify_fn,
copier: PartialRebuild::new(map, descriptor),
};
RebuildJob::from_backend(backend).await.map(BdevRebuildJob)
}
None => {
let backend = BdevRebuildJobBackend {
task_pool,
notify_fn,
copier: FullRebuild::new(descriptor),
};
RebuildJob::from_backend(backend).await.map(BdevRebuildJob)
}
}
}
}

RebuildJob::from_backend(backend).await.map(Self)
impl BdevRebuildJob {
/// Helps create a `Self` using a builder: `BdevRebuildJobBuilder`.
pub fn builder() -> BdevRebuildJobBuilder {
BdevRebuildJobBuilder::default()
}
}

gen_rebuild_instances!(BdevRebuildJob);

/// A rebuild job which is responsible for rebuilding from
/// source to target of the `RebuildDescriptor`.
pub(super) struct BdevRebuildJobBackend {
pub(super) struct BdevRebuildJobBackend<R: RangeRebuilder<RebuildDescriptor>> {
/// A pool of tasks which perform the actual data rebuild.
task_pool: RebuildTasks,
/// A generic rebuild descriptor.
copier: FullRebuild<RebuildDescriptor>,
copier: R,
/// Notification callback with src and dst uri's.
notify_fn: fn(&str, &str) -> (),
}

#[async_trait::async_trait(?Send)]
impl RebuildBackend for BdevRebuildJobBackend {
impl<R: RangeRebuilder<RebuildDescriptor>> RebuildBackend
for BdevRebuildJobBackend<R>
{
fn on_state_change(&mut self) {
let desc = self.common_desc();
(self.notify_fn)(&desc.src_uri, &desc.dst_uri);
Expand Down Expand Up @@ -110,33 +161,19 @@ impl RebuildBackend for BdevRebuildJobBackend {
}
}

impl std::fmt::Debug for BdevRebuildJobBackend {
impl<R: RangeRebuilder<RebuildDescriptor>> std::fmt::Debug
for BdevRebuildJobBackend<R>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BdevRebuildJob")
.field("next", &self.copier.peek_next())
.finish()
}
}
impl std::fmt::Display for BdevRebuildJobBackend {
impl<R: RangeRebuilder<RebuildDescriptor>> std::fmt::Display
for BdevRebuildJobBackend<R>
{
fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Ok(())
}
}

impl BdevRebuildJobBackend {
/// Creates a new RebuildJob which rebuilds from source URI to target URI
/// from start to end (of the data partition); notify_fn callback is called
/// when the rebuild state is updated - with the source and destination
/// URI as arguments.
pub async fn new(
task_pool: RebuildTasks,
notify_fn: fn(&str, &str) -> (),
descriptor: RebuildDescriptor,
) -> Result<Self, RebuildError> {
Ok(Self {
task_pool,
copier: FullRebuild::new(descriptor),
notify_fn,
})
}
}
16 changes: 14 additions & 2 deletions io-engine/src/rebuild/rebuild_descriptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::{
CoreError,
IoCompletionStatus,
ReadOptions,
SegmentMap,
},
rebuild::{
rebuild_error::{BdevInvalidUri, NoCopyBuffer},
Expand Down Expand Up @@ -108,7 +109,7 @@ impl RebuildDescriptor {
destination_hdl.get_device(),
&range,
) {
return Err(RebuildError::InvalidParameters {});
return Err(RebuildError::InvalidBdevRange {});
}

let block_size = dst_descriptor.get_device().block_len();
Expand All @@ -128,7 +129,7 @@ impl RebuildDescriptor {
}

/// Check if the source and destination block devices are compatible for
/// rebuild
/// rebuild.
fn validate(
source: &dyn BlockDevice,
destination: &dyn BlockDevice,
Expand All @@ -141,6 +142,17 @@ impl RebuildDescriptor {
&& source.block_len() == destination.block_len()
}

/// Check if the rebuild range is compatible with the rebuild segment map.
pub(crate) fn validate_map(
&self,
map: &SegmentMap,
) -> Result<(), RebuildError> {
if map.size_blks() > self.range.end {
return Err(RebuildError::InvalidMapRange {});
}
Ok(())
}

/// Return the size of the segment to be copied.
#[inline(always)]
pub(super) fn get_segment_size_blks(&self, blk: u64) -> u64 {
Expand Down
6 changes: 4 additions & 2 deletions io-engine/src/rebuild/rebuild_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ pub enum RebuildError {
JobAlreadyExists { job: String },
#[snafu(display("Failed to allocate buffer for the rebuild copy"))]
NoCopyBuffer { source: DmaError },
#[snafu(display("Failed to validate rebuild job creation parameters"))]
InvalidParameters {},
#[snafu(display("Block device size is not compatible"))]
InvalidBdevRange {},
#[snafu(display("Map range is not compatible with block device range"))]
InvalidMapRange {},
#[snafu(display(
"The same device was specified for both source and destination: {bdev}"
))]
Expand Down
7 changes: 6 additions & 1 deletion io-engine/src/rebuild/rebuild_job_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,12 @@ impl RebuildJobBackendManager {
let blocks_remaining = self.backend.blocks_remaining();

let progress = (blocks_recovered * 100) / blocks_total;
assert!(progress < 100 || blocks_remaining == 0);
assert!(
progress < 100 || blocks_remaining == 0,
"progress is {}% but there are {} blocks remaining",
progress,
blocks_remaining
);

RebuildStats {
start_time: descriptor.start_time,
Expand Down
5 changes: 0 additions & 5 deletions io-engine/src/rebuild/rebuild_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ impl RebuildMap {
pub(crate) fn count_dirty_blks(&self) -> u64 {
self.segments.count_dirty_blks()
}

/// Get the rebuild map segment size in blocks.
pub(crate) fn segment_size_blks(&self) -> u64 {
self.segments.segment_size_blks()
}
}

impl From<RebuildMap> for BitVec {
Expand Down
15 changes: 9 additions & 6 deletions io-engine/src/rebuild/rebuilders.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use crate::rebuild::{
rebuild_descriptor::RebuildDescriptor,
rebuild_task::{RebuildTask, RebuildTaskCopier},
RebuildError,
RebuildMap,
use crate::{
core::SegmentMap,
rebuild::{
rebuild_descriptor::RebuildDescriptor,
rebuild_task::{RebuildTask, RebuildTaskCopier},
RebuildError,
RebuildMap,
},
};
use bit_vec::BitVec;
use std::{ops::Range, rc::Rc};
Expand Down Expand Up @@ -84,7 +87,7 @@ impl<T: RebuildTaskCopier> PartialRebuild<T> {
/// Create a partial sequential rebuild with the given copier and segment
/// map.
#[allow(dead_code)]
pub(super) fn new(map: RebuildMap, copier: T) -> Self {
pub(super) fn new(map: SegmentMap, copier: T) -> Self {
let total_blks = map.count_dirty_blks();
let segment_size_blks = map.segment_size_blks();
let bit_vec: BitVec = map.into();
Expand Down
Loading

0 comments on commit 7c42799

Please sign in to comment.