Skip to content

Commit

Permalink
Merge #1581
Browse files Browse the repository at this point in the history
1581: Rebuild refactoring r=tiagolobocastro a=tiagolobocastro

    refactor(rebuild): make the rebuild more generic
    
    A rebuild is essentially a copy from one bdev to another.
    However, we can have different variations on this.
    This change aims to make the rebuild more generic, removing the nexus specific
    bits from the core rebuild logic and allowing us to compose different types of
    rebuild in the future, whilst being able to reuse the shared bits.
    
    To achieve this the rebuild backend is split into a rebuild manager which is the
    generic component responsible for running and managing the rebuild and its tasks.
    We can then implement different rebuild backends as we see fit, example a nexus
    rebuild which locks ranges or a regular bdev to bdev rebuild.
    
    Signed-off-by: Tiago Castro <[email protected]>

---

    fix(rebuild): send final rebuild state on drop
    
    Signed-off-by: Tiago Castro <[email protected]>


Co-authored-by: Tiago Castro <[email protected]>
  • Loading branch information
mayastor-bors and tiagolobocastro committed Jan 30, 2024
2 parents 5ed2160 + dc53c12 commit 0ab97bd
Show file tree
Hide file tree
Showing 18 changed files with 1,064 additions and 603 deletions.
9 changes: 9 additions & 0 deletions .github/workflows/pr-commitlint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,14 @@ jobs:
if [ ! ${{ github.ref }} = "refs/heads/staging" ]; then
first_commit=${{ github.event.pull_request.base.sha }}
last_commit=${{ github.event.pull_request.head.sha }}
# Ensure code-review commits don't get merged
sed "s/code-review-rule': \[0/code-review-rule': [2/g" -i commitlint.config.js
npx commitlint --from $first_commit --to $last_commit -V
git log --pretty=format:%s $first_commit..$last_commit > ./subjects
duplicates="$(cat ./subjects | sort | uniq -D)"
if [ "$duplicates" != "" ]; then
echo -e "Duplicate commits found:\n$duplicates" >&2
exit 1
fi
fi
21 changes: 18 additions & 3 deletions commitlint.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,25 @@ module.exports = {
extends: ['@commitlint/config-conventional'],
rules: {
'type-enum': [2, 'always', ['build', 'chore', 'ci', 'docs', 'feat', 'fix', 'perf', 'refactor', 'revert', 'style', 'test', 'example']],
'code-review-rule': [0, 'always'],
},
defaultIgnores: false,
ignores: [
(message) => message.startsWith('chore(bors): merge pull request #'),
(message) => message.startsWith('Merge #')
]
(message) => message.startsWith('chore(bors): merge pull request #'),
(message) => message.startsWith('Merge #')
],
plugins: [
{
rules: {
'code-review-rule': ({subject}) => {
const REVIEW_COMMENTS = `Please don't merge code-review commits, instead squash them in the parent commit`;
if (subject.includes('code-review')) return [ false, REVIEW_COMMENTS ];
if (subject.includes('review comment')) return [ false, REVIEW_COMMENTS ];
if (subject.includes('address comment')) return [ false, REVIEW_COMMENTS ];
if (subject.includes('addressed comment')) return [ false, REVIEW_COMMENTS ];
return [ true ];
},
},
},
],
}
6 changes: 3 additions & 3 deletions io-engine-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use io_engine::{
core::{MayastorEnvironment, Mthread},
logger,
logger::LogFormat,
rebuild::{RebuildJob, RebuildState},
rebuild::{NexusRebuildJob, RebuildState},
};

pub mod bdev;
Expand Down Expand Up @@ -457,7 +457,7 @@ pub async fn wait_for_rebuild(
timeout: Duration,
) {
let (s, r) = unbounded::<()>();
let job = match RebuildJob::lookup(&dst_uri) {
let job = match NexusRebuildJob::lookup(&dst_uri) {
Ok(job) => job,
Err(_) => return,
};
Expand Down Expand Up @@ -490,7 +490,7 @@ pub async fn wait_for_rebuild(
error
});
reactor_poll!(r);
if let Ok(job) = RebuildJob::lookup(&dst_uri) {
if let Ok(job) = NexusRebuildJob::lookup(&dst_uri) {
job.stats().await;
}
t.join().unwrap().unwrap();
Expand Down
22 changes: 11 additions & 11 deletions io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use crate::{
eventing::{EventMetaGen, EventWithMeta},
rebuild::{
HistoryRecord,
NexusRebuildJob,
RebuildError,
RebuildJob,
RebuildJobOptions,
RebuildState,
RebuildStats,
Expand Down Expand Up @@ -139,8 +139,8 @@ impl<'n> Nexus<'n> {
self.reconfigure(DrEvent::ChildRebuild).await;

// Stop the I/O log and create a rebuild map from it.
// As this is done after the reconfiguraion, any new write I/Os will
// now reach the destionation child, and no rebuild will be required
// As this is done after the reconfiguration, any new write I/Os will
// now reach the destination child, and no rebuild will be required
// for them.
let map = self
.lookup_child(&dst_child_uri)
Expand Down Expand Up @@ -186,7 +186,7 @@ impl<'n> Nexus<'n> {
verify_mode,
};

RebuildJob::new(
NexusRebuildJob::new(
&self.name,
src_child_uri,
dst_child_uri,
Expand All @@ -202,7 +202,7 @@ impl<'n> Nexus<'n> {
},
)
.await
.and_then(RebuildJob::store)
.and_then(NexusRebuildJob::store)
.context(nexus_err::CreateRebuild {
child: dst_child_uri.to_owned(),
name: self.name.clone(),
Expand All @@ -211,7 +211,7 @@ impl<'n> Nexus<'n> {

/// Translates the job into a new history record and pushes into
/// the history.
fn create_history_record(&self, job: Arc<RebuildJob>) {
fn create_history_record(&self, job: Arc<NexusRebuildJob>) {
let Some(rec) = job.history_record() else {
error!("{self:?}: try to get history record on unfinished job");
return;
Expand Down Expand Up @@ -330,7 +330,7 @@ impl<'n> Nexus<'n> {
pub async fn cancel_rebuild_jobs(&self, src_uri: &str) -> Vec<String> {
info!("{:?}: cancel rebuild jobs from '{}'...", self, src_uri);

let src_jobs = RebuildJob::lookup_src(src_uri);
let src_jobs = NexusRebuildJob::lookup_src(src_uri);
let mut terminated_jobs = Vec::new();
let mut rebuilding_children = Vec::new();

Expand Down Expand Up @@ -375,8 +375,8 @@ impl<'n> Nexus<'n> {
pub(crate) fn rebuild_job(
&self,
dst_child_uri: &str,
) -> Result<std::sync::Arc<RebuildJob>, Error> {
RebuildJob::lookup(dst_child_uri).map_err(|_| {
) -> Result<std::sync::Arc<NexusRebuildJob>, Error> {
NexusRebuildJob::lookup(dst_child_uri).map_err(|_| {
Error::RebuildJobNotFound {
child: dst_child_uri.to_owned(),
name: self.name.to_owned(),
Expand All @@ -389,9 +389,9 @@ impl<'n> Nexus<'n> {
pub(crate) fn rebuild_job_mut(
&self,
dst_child_uri: &str,
) -> Result<Arc<RebuildJob>, Error> {
) -> Result<Arc<NexusRebuildJob>, Error> {
let name = self.name.clone();
RebuildJob::lookup(dst_child_uri).map_err(|_| {
NexusRebuildJob::lookup(dst_child_uri).map_err(|_| {
Error::RebuildJobNotFound {
child: dst_child_uri.to_owned(),
name,
Expand Down
12 changes: 7 additions & 5 deletions io-engine/src/bdev/nexus/nexus_child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
VerboseError,
},
persistent_store::PersistentStore,
rebuild::{RebuildJob, RebuildMap},
rebuild::{NexusRebuildJob, RebuildMap},
};

use crate::{
Expand Down Expand Up @@ -1199,13 +1199,15 @@ impl<'c> NexusChild<'c> {
/// TODO
pub(super) fn remove_rebuild_job(
&self,
) -> Option<std::sync::Arc<RebuildJob>> {
RebuildJob::remove(&self.name).ok()
) -> Option<std::sync::Arc<NexusRebuildJob>> {
NexusRebuildJob::remove(&self.name).ok()
}

/// Return the rebuild job which is rebuilding this child, if rebuilding.
pub(crate) fn rebuild_job(&self) -> Option<std::sync::Arc<RebuildJob>> {
RebuildJob::lookup(&self.name).ok()
pub(crate) fn rebuild_job(
&self,
) -> Option<std::sync::Arc<NexusRebuildJob>> {
NexusRebuildJob::lookup(&self.name).ok()
}

/// Return the rebuild progress on this child, if rebuilding.
Expand Down
4 changes: 2 additions & 2 deletions io-engine/src/eventing/nexus_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ use crate::{
bdev::{nexus, nexus::NexusChild},
core::{MayastorEnvironment, VerboseError},
eventing::{Event, EventMetaGen, EventWithMeta},
rebuild::{RebuildJob, RebuildState},
rebuild::{NexusRebuildJob, RebuildState},
};

impl EventMetaGen for RebuildJob {
impl EventMetaGen for NexusRebuildJob {
fn meta(&self) -> EventMeta {
let rebuild_status = match self.state() {
RebuildState::Init | RebuildState::Running => {
Expand Down
6 changes: 3 additions & 3 deletions io-engine/src/grpc/v0/nexus_grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
PtplFileOps,
},
core::{Protocol, Share},
rebuild::RebuildJob,
rebuild::NexusRebuildJob,
};

fn map_fault_reason(r: FaultReason) -> ChildStateReason {
Expand Down Expand Up @@ -137,7 +137,7 @@ impl<'n> Nexus<'n> {
}
children
},
rebuilds: RebuildJob::count() as u32,
rebuilds: NexusRebuildJob::count() as u32,
allowed_hosts: self.allowed_hosts(),
}
}
Expand Down Expand Up @@ -165,7 +165,7 @@ impl<'n> Nexus<'n> {
}
children
},
rebuilds: RebuildJob::count() as u32,
rebuilds: NexusRebuildJob::count() as u32,
ana_state: ana_state as i32,
allowed_hosts: self.allowed_hosts(),
}
Expand Down
144 changes: 144 additions & 0 deletions io-engine/src/rebuild/bdev_rebuild.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
use std::{
ops::{Deref, Range},
rc::Rc,
};

use super::{
rebuild_descriptor::RebuildDescriptor,
rebuild_error::RebuildError,
rebuild_job_backend::RebuildBackend,
rebuild_task::{RebuildTasks, TaskResult},
RebuildJob,
RebuildJobOptions,
SEGMENT_TASKS,
};

use crate::gen_rebuild_instances;

/// A Bdev rebuild job is responsible for managing a rebuild (copy) which reads
/// from source_hdl and writes into destination_hdl from specified start to end.
pub struct BdevRebuildJob(RebuildJob);

impl std::fmt::Debug for BdevRebuildJob {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl Deref for BdevRebuildJob {
type Target = RebuildJob;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl BdevRebuildJob {
/// Creates a new RebuildJob which rebuilds from source URI to target URI
/// from start to end (of the data partition); notify_fn callback is called
/// when the rebuild state is updated - with the source and destination
/// bdev URI's as arguments.
pub async fn new(
src_uri: &str,
dst_uri: &str,
range: Option<Range<u64>>,
options: RebuildJobOptions,
notify_fn: fn(&str, &str) -> (),
) -> Result<Self, RebuildError> {
let descriptor =
RebuildDescriptor::new(src_uri, dst_uri, range, options).await?;
let tasks = RebuildTasks::new(SEGMENT_TASKS, &descriptor)?;
let backend =
BdevRebuildJobBackend::new(tasks, notify_fn, descriptor).await?;

RebuildJob::from_backend(backend).await.map(Self)
}
}

gen_rebuild_instances!(BdevRebuildJob);

/// A rebuild job which is responsible for rebuilding from
/// source to target of the `RebuildDescriptor`.
pub(super) struct BdevRebuildJobBackend {
/// The next block to be rebuilt.
next: u64,
/// A pool of tasks which perform the actual data rebuild.
task_pool: RebuildTasks,
/// A generic rebuild descriptor.
descriptor: Rc<RebuildDescriptor>,
/// Notification callback with src and dst uri's.
notify_fn: fn(&str, &str) -> (),
}

#[async_trait::async_trait(?Send)]
impl RebuildBackend for BdevRebuildJobBackend {
fn on_state_change(&mut self) {
(self.notify_fn)(&self.descriptor.src_uri, &self.descriptor.dst_uri);
}

fn common_desc(&self) -> &RebuildDescriptor {
&self.descriptor
}

fn task_pool(&self) -> &RebuildTasks {
&self.task_pool
}

fn schedule_task_by_id(&mut self, id: usize) -> bool {
if self.next >= self.descriptor.range.end {
false
} else {
let next = std::cmp::min(
self.next + self.descriptor.segment_size_blks,
self.descriptor.range.end,
);
self.task_pool.schedule_segment_rebuild(
id,
self.next,
self.descriptor.clone(),
);
self.task_pool.active += 1;
self.next = next;
true
}
}

async fn await_one_task(&mut self) -> Option<TaskResult> {
self.task_pool.await_one_task().await
}
}

impl std::fmt::Debug for BdevRebuildJobBackend {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BdevRebuildJob")
.field("next", &self.next)
.finish()
}
}
impl std::fmt::Display for BdevRebuildJobBackend {
fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Ok(())
}
}

impl BdevRebuildJobBackend {
/// Creates a new RebuildJob which rebuilds from source URI to target URI
/// from start to end (of the data partition); notify_fn callback is called
/// when the rebuild state is updated - with the source and destination
/// URI as arguments.
pub async fn new(
task_pool: RebuildTasks,
notify_fn: fn(&str, &str) -> (),
descriptor: RebuildDescriptor,
) -> Result<Self, RebuildError> {
let be = Self {
next: descriptor.range.start,
task_pool,
descriptor: Rc::new(descriptor),
notify_fn,
};

info!("{be}: backend created");

Ok(be)
}
}
Loading

0 comments on commit 0ab97bd

Please sign in to comment.