Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rebuild/bdev): use builder pattern #1604

Merged
merged 1 commit into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion io-engine/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ mod nic;
pub mod partition;
mod reactor;
pub mod runtime;
pub(crate) mod segment_map;
pub mod segment_map;
mod share;
pub mod snapshot;
pub(crate) mod thread;
Expand Down
31 changes: 16 additions & 15 deletions io-engine/src/core/segment_map.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use bit_vec::BitVec;
use bit_vec::{BitBlock, BitVec};
use std::fmt::{Debug, Formatter};

// Returns ceil of an integer division.
Expand All @@ -10,10 +10,10 @@ fn div_ceil(a: u64, b: u64) -> u64 {
/// It marks every segment as a clean (no need to rebuild, or already
/// transferred), or dirty (need to transfer from a healthy device).
#[derive(Clone)]
pub(crate) struct SegmentMap {
pub struct SegmentMap<B: BitBlock = u32> {
/// Bitmap of rebuild segments of a device. Zeros indicate clean segments,
/// ones mark dirty ones.
segments: BitVec,
segments: BitVec<B>,
/// Device size in segments.
num_segments: u64,
/// Device size in blocks.
Expand All @@ -24,7 +24,7 @@ pub(crate) struct SegmentMap {
segment_size: u64,
}

impl Debug for SegmentMap {
impl<B: BitBlock> Debug for SegmentMap<B> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
Expand All @@ -37,15 +37,11 @@ impl Debug for SegmentMap {
}
}

impl SegmentMap {
impl<B: BitBlock> SegmentMap<B> {
/// Creates a new segment map with the given parameters.
pub(crate) fn new(
num_blocks: u64,
block_len: u64,
segment_size: u64,
) -> Self {
pub fn new(num_blocks: u64, block_len: u64, segment_size: u64) -> Self {
let num_segments = div_ceil(num_blocks * block_len, segment_size);
let mut segments = BitVec::new();
let mut segments = BitVec::<B>::default();
segments.grow(num_segments as usize, false);
Self {
segments,
Expand All @@ -57,14 +53,14 @@ impl SegmentMap {
}

/// Merges (bitwise OR) this map with another.
pub(crate) fn merge(mut self, other: &SegmentMap) -> Self {
pub(crate) fn merge(mut self, other: &SegmentMap<B>) -> Self {
self.segments.or(&other.segments);
self
}

/// Sets a segment bit corresponding to the given logical block, to the
/// given value.
pub(crate) fn set(&mut self, lbn: u64, lbn_cnt: u64, value: bool) {
pub fn set(&mut self, lbn: u64, lbn_cnt: u64, value: bool) {
assert_ne!(self.num_blocks, 0);

let start_seg = self.lbn_to_seg(lbn);
Expand All @@ -76,7 +72,7 @@ impl SegmentMap {
}

/// Returns value of segment bit corresponding to the given logical block.
pub(crate) fn get(&self, lbn: u64) -> Option<bool> {
pub fn get(&self, lbn: u64) -> Option<bool> {
let seg = self.lbn_to_seg(lbn);
self.segments.get(seg)
}
Expand All @@ -93,14 +89,19 @@ impl SegmentMap {
}

/// Counts the total number of dirty blocks.
pub(crate) fn count_dirty_blks(&self) -> u64 {
pub fn count_dirty_blks(&self) -> u64 {
self.count_ones() * self.segment_size / self.block_len
}

/// Get the segment size in blocks.
pub(crate) fn segment_size_blks(&self) -> u64 {
self.segment_size / self.block_len
}

/// Get the full size reference by the bitmap in blocks.
pub(crate) fn size_blks(&self) -> u64 {
self.num_blocks
}
}

impl From<SegmentMap> for BitVec {
Expand Down
115 changes: 76 additions & 39 deletions io-engine/src/rebuild/bdev_rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ use super::{
};

use crate::{
core::SegmentMap,
gen_rebuild_instances,
rebuild::rebuilders::{FullRebuild, RangeRebuilder},
rebuild::rebuilders::{FullRebuild, PartialRebuild, RangeRebuilder},
};

/// A Bdev rebuild job is responsible for managing a rebuild (copy) which reads
Expand All @@ -32,43 +33,93 @@ impl Deref for BdevRebuildJob {
}
}

impl BdevRebuildJob {
/// Creates a new RebuildJob which rebuilds from source URI to target URI
/// from start to end (of the data partition); notify_fn callback is called
/// when the rebuild state is updated - with the source and destination
/// bdev URI's as arguments.
pub async fn new(
/// Builder for the `BdevRebuildJob`, allowing for custom bdev to bdev rebuilds.
#[derive(Default)]
pub struct BdevRebuildJobBuilder {
range: Option<Range<u64>>,
options: RebuildJobOptions,
notify_fn: Option<fn(&str, &str) -> ()>,
rebuild_map: Option<SegmentMap>,
}
impl BdevRebuildJobBuilder {
/// Specify a particular range.
pub fn with_range(mut self, range: Range<u64>) -> Self {
self.range = Some(range);
self
}
/// Specify the rebuild options.
pub fn with_option(mut self, options: RebuildJobOptions) -> Self {
self.options = options;
self
}
/// Specify a notification function.
pub fn with_notify_fn(mut self, notify_fn: fn(&str, &str) -> ()) -> Self {
self.notify_fn = Some(notify_fn);
self
}
/// Specify a rebuild map, turning it into a partial rebuild.
pub fn with_bitmap(mut self, rebuild_map: SegmentMap) -> Self {
self.rebuild_map = Some(rebuild_map);
self
}
/// Builds a `BdevRebuildJob` which can be started and which will then
/// rebuild from source to destination.
pub async fn build(
self,
src_uri: &str,
dst_uri: &str,
range: Option<Range<u64>>,
options: RebuildJobOptions,
notify_fn: fn(&str, &str) -> (),
) -> Result<Self, RebuildError> {
) -> Result<BdevRebuildJob, RebuildError> {
let descriptor =
RebuildDescriptor::new(src_uri, dst_uri, range, options).await?;
let tasks = RebuildTasks::new(SEGMENT_TASKS, &descriptor)?;
let backend =
BdevRebuildJobBackend::new(tasks, notify_fn, descriptor).await?;
RebuildDescriptor::new(src_uri, dst_uri, self.range, self.options)
.await?;
let task_pool = RebuildTasks::new(SEGMENT_TASKS, &descriptor)?;
let notify_fn = self.notify_fn.unwrap_or(|_, _| {});
match self.rebuild_map {
Some(map) => {
descriptor.validate_map(&map)?;
let backend = BdevRebuildJobBackend {
task_pool,
notify_fn,
copier: PartialRebuild::new(map, descriptor),
};
RebuildJob::from_backend(backend).await.map(BdevRebuildJob)
}
None => {
let backend = BdevRebuildJobBackend {
task_pool,
notify_fn,
copier: FullRebuild::new(descriptor),
};
RebuildJob::from_backend(backend).await.map(BdevRebuildJob)
}
}
}
}

RebuildJob::from_backend(backend).await.map(Self)
impl BdevRebuildJob {
/// Helps create a `Self` using a builder: `BdevRebuildJobBuilder`.
pub fn builder() -> BdevRebuildJobBuilder {
BdevRebuildJobBuilder::default()
}
}

gen_rebuild_instances!(BdevRebuildJob);

/// A rebuild job which is responsible for rebuilding from
/// source to target of the `RebuildDescriptor`.
pub(super) struct BdevRebuildJobBackend {
pub(super) struct BdevRebuildJobBackend<R: RangeRebuilder<RebuildDescriptor>> {
/// A pool of tasks which perform the actual data rebuild.
task_pool: RebuildTasks,
/// A generic rebuild descriptor.
copier: FullRebuild<RebuildDescriptor>,
copier: R,
/// Notification callback with src and dst uri's.
notify_fn: fn(&str, &str) -> (),
}

#[async_trait::async_trait(?Send)]
impl RebuildBackend for BdevRebuildJobBackend {
impl<R: RangeRebuilder<RebuildDescriptor>> RebuildBackend
for BdevRebuildJobBackend<R>
{
fn on_state_change(&mut self) {
let desc = self.common_desc();
(self.notify_fn)(&desc.src_uri, &desc.dst_uri);
Expand Down Expand Up @@ -110,33 +161,19 @@ impl RebuildBackend for BdevRebuildJobBackend {
}
}

impl std::fmt::Debug for BdevRebuildJobBackend {
impl<R: RangeRebuilder<RebuildDescriptor>> std::fmt::Debug
for BdevRebuildJobBackend<R>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BdevRebuildJob")
.field("next", &self.copier.peek_next())
.finish()
}
}
impl std::fmt::Display for BdevRebuildJobBackend {
impl<R: RangeRebuilder<RebuildDescriptor>> std::fmt::Display
for BdevRebuildJobBackend<R>
{
fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Ok(())
}
}

impl BdevRebuildJobBackend {
/// Creates a new RebuildJob which rebuilds from source URI to target URI
/// from start to end (of the data partition); notify_fn callback is called
/// when the rebuild state is updated - with the source and destination
/// URI as arguments.
pub async fn new(
task_pool: RebuildTasks,
notify_fn: fn(&str, &str) -> (),
descriptor: RebuildDescriptor,
) -> Result<Self, RebuildError> {
Ok(Self {
task_pool,
copier: FullRebuild::new(descriptor),
notify_fn,
})
}
}
16 changes: 14 additions & 2 deletions io-engine/src/rebuild/rebuild_descriptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::{
CoreError,
IoCompletionStatus,
ReadOptions,
SegmentMap,
},
rebuild::{
rebuild_error::{BdevInvalidUri, NoCopyBuffer},
Expand Down Expand Up @@ -108,7 +109,7 @@ impl RebuildDescriptor {
destination_hdl.get_device(),
&range,
) {
return Err(RebuildError::InvalidParameters {});
return Err(RebuildError::InvalidSrcDstRange {});
}

let block_size = dst_descriptor.get_device().block_len();
Expand All @@ -128,7 +129,7 @@ impl RebuildDescriptor {
}

/// Check if the source and destination block devices are compatible for
/// rebuild
/// rebuild.
fn validate(
source: &dyn BlockDevice,
destination: &dyn BlockDevice,
Expand All @@ -141,6 +142,17 @@ impl RebuildDescriptor {
&& source.block_len() == destination.block_len()
}

/// Check if the rebuild range is compatible with the rebuild segment map.
pub(crate) fn validate_map(
&self,
map: &SegmentMap,
) -> Result<(), RebuildError> {
if map.size_blks() > self.range.end {
return Err(RebuildError::InvalidMapRange {});
}
Ok(())
}

/// Return the size of the segment to be copied.
#[inline(always)]
pub(super) fn get_segment_size_blks(&self, blk: u64) -> u64 {
Expand Down
6 changes: 4 additions & 2 deletions io-engine/src/rebuild/rebuild_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ pub enum RebuildError {
JobAlreadyExists { job: String },
#[snafu(display("Failed to allocate buffer for the rebuild copy"))]
NoCopyBuffer { source: DmaError },
#[snafu(display("Failed to validate rebuild job creation parameters"))]
InvalidParameters {},
#[snafu(display("Source and Destination size range is not compatible"))]
InvalidSrcDstRange {},
#[snafu(display("Map range is not compatible with rebuild range"))]
InvalidMapRange {},
#[snafu(display(
"The same device was specified for both source and destination: {bdev}"
))]
Expand Down
7 changes: 6 additions & 1 deletion io-engine/src/rebuild/rebuild_job_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,12 @@ impl RebuildJobBackendManager {
let blocks_remaining = self.backend.blocks_remaining();

let progress = (blocks_recovered * 100) / blocks_total;
assert!(progress < 100 || blocks_remaining == 0);
assert!(
progress < 100 || blocks_remaining == 0,
"progress is {}% but there are {} blocks remaining",
progress,
blocks_remaining
);

RebuildStats {
start_time: descriptor.start_time,
Expand Down
5 changes: 0 additions & 5 deletions io-engine/src/rebuild/rebuild_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ impl RebuildMap {
pub(crate) fn count_dirty_blks(&self) -> u64 {
self.segments.count_dirty_blks()
}

/// Get the rebuild map segment size in blocks.
pub(crate) fn segment_size_blks(&self) -> u64 {
self.segments.segment_size_blks()
}
}

impl From<RebuildMap> for BitVec {
Expand Down
15 changes: 9 additions & 6 deletions io-engine/src/rebuild/rebuilders.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use crate::rebuild::{
rebuild_descriptor::RebuildDescriptor,
rebuild_task::{RebuildTask, RebuildTaskCopier},
RebuildError,
RebuildMap,
use crate::{
core::SegmentMap,
rebuild::{
rebuild_descriptor::RebuildDescriptor,
rebuild_task::{RebuildTask, RebuildTaskCopier},
RebuildError,
RebuildMap,
},
};
use bit_vec::BitVec;
use std::{ops::Range, rc::Rc};
Expand Down Expand Up @@ -84,7 +87,7 @@ impl<T: RebuildTaskCopier> PartialRebuild<T> {
/// Create a partial sequential rebuild with the given copier and segment
/// map.
#[allow(dead_code)]
pub(super) fn new(map: RebuildMap, copier: T) -> Self {
pub(super) fn new(map: SegmentMap, copier: T) -> Self {
let total_blks = map.count_dirty_blks();
let segment_size_blks = map.segment_size_blks();
let bit_vec: BitVec = map.into();
Expand Down
Loading
Loading