Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebuild refactoring #1581

Merged
merged 4 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/workflows/pr-commitlint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,14 @@ jobs:
if [ ! ${{ github.ref }} = "refs/heads/staging" ]; then
first_commit=${{ github.event.pull_request.base.sha }}
last_commit=${{ github.event.pull_request.head.sha }}
# Ensure code-review commits don't get merged
sed "s/code-review-rule': \[0/code-review-rule': [2/g" -i commitlint.config.js
npx commitlint --from $first_commit --to $last_commit -V

git log --pretty=format:%s $first_commit..$last_commit > ./subjects
duplicates="$(cat ./subjects | sort | uniq -D)"
if [ "$duplicates" != "" ]; then
echo -e "Duplicate commits found:\n$duplicates" >&2
exit 1
fi
fi
21 changes: 18 additions & 3 deletions commitlint.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,25 @@ module.exports = {
extends: ['@commitlint/config-conventional'],
rules: {
'type-enum': [2, 'always', ['build', 'chore', 'ci', 'docs', 'feat', 'fix', 'perf', 'refactor', 'revert', 'style', 'test', 'example']],
'code-review-rule': [0, 'always'],
},
defaultIgnores: false,
ignores: [
(message) => message.startsWith('chore(bors): merge pull request #'),
(message) => message.startsWith('Merge #')
]
(message) => message.startsWith('chore(bors): merge pull request #'),
(message) => message.startsWith('Merge #')
],
plugins: [
{
rules: {
'code-review-rule': ({subject}) => {
const REVIEW_COMMENTS = `Please don't merge code-review commits, instead squash them in the parent commit`;
if (subject.includes('code-review')) return [ false, REVIEW_COMMENTS ];
if (subject.includes('review comment')) return [ false, REVIEW_COMMENTS ];
if (subject.includes('address comment')) return [ false, REVIEW_COMMENTS ];
if (subject.includes('addressed comment')) return [ false, REVIEW_COMMENTS ];
return [ true ];
},
},
},
],
}
6 changes: 3 additions & 3 deletions io-engine-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use io_engine::{
core::{MayastorEnvironment, Mthread},
logger,
logger::LogFormat,
rebuild::{RebuildJob, RebuildState},
rebuild::{NexusRebuildJob, RebuildState},
};

pub mod bdev;
Expand Down Expand Up @@ -457,7 +457,7 @@ pub async fn wait_for_rebuild(
timeout: Duration,
) {
let (s, r) = unbounded::<()>();
let job = match RebuildJob::lookup(&dst_uri) {
let job = match NexusRebuildJob::lookup(&dst_uri) {
Ok(job) => job,
Err(_) => return,
};
Expand Down Expand Up @@ -490,7 +490,7 @@ pub async fn wait_for_rebuild(
error
});
reactor_poll!(r);
if let Ok(job) = RebuildJob::lookup(&dst_uri) {
if let Ok(job) = NexusRebuildJob::lookup(&dst_uri) {
job.stats().await;
}
t.join().unwrap().unwrap();
Expand Down
22 changes: 11 additions & 11 deletions io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use crate::{
eventing::{EventMetaGen, EventWithMeta},
rebuild::{
HistoryRecord,
NexusRebuildJob,
RebuildError,
RebuildJob,
RebuildJobOptions,
RebuildState,
RebuildStats,
Expand Down Expand Up @@ -139,8 +139,8 @@ impl<'n> Nexus<'n> {
self.reconfigure(DrEvent::ChildRebuild).await;

// Stop the I/O log and create a rebuild map from it.
// As this is done after the reconfiguraion, any new write I/Os will
// now reach the destionation child, and no rebuild will be required
// As this is done after the reconfiguration, any new write I/Os will
// now reach the destination child, and no rebuild will be required
// for them.
let map = self
.lookup_child(&dst_child_uri)
Expand Down Expand Up @@ -186,7 +186,7 @@ impl<'n> Nexus<'n> {
verify_mode,
};

RebuildJob::new(
NexusRebuildJob::new(
&self.name,
src_child_uri,
dst_child_uri,
Expand All @@ -202,7 +202,7 @@ impl<'n> Nexus<'n> {
},
)
.await
.and_then(RebuildJob::store)
.and_then(NexusRebuildJob::store)
.context(nexus_err::CreateRebuild {
child: dst_child_uri.to_owned(),
name: self.name.clone(),
Expand All @@ -211,7 +211,7 @@ impl<'n> Nexus<'n> {

/// Translates the job into a new history record and pushes into
/// the history.
fn create_history_record(&self, job: Arc<RebuildJob>) {
fn create_history_record(&self, job: Arc<NexusRebuildJob>) {
let Some(rec) = job.history_record() else {
error!("{self:?}: try to get history record on unfinished job");
return;
Expand Down Expand Up @@ -330,7 +330,7 @@ impl<'n> Nexus<'n> {
pub async fn cancel_rebuild_jobs(&self, src_uri: &str) -> Vec<String> {
info!("{:?}: cancel rebuild jobs from '{}'...", self, src_uri);

let src_jobs = RebuildJob::lookup_src(src_uri);
let src_jobs = NexusRebuildJob::lookup_src(src_uri);
let mut terminated_jobs = Vec::new();
let mut rebuilding_children = Vec::new();

Expand Down Expand Up @@ -375,8 +375,8 @@ impl<'n> Nexus<'n> {
pub(crate) fn rebuild_job(
&self,
dst_child_uri: &str,
) -> Result<std::sync::Arc<RebuildJob>, Error> {
RebuildJob::lookup(dst_child_uri).map_err(|_| {
) -> Result<std::sync::Arc<NexusRebuildJob>, Error> {
NexusRebuildJob::lookup(dst_child_uri).map_err(|_| {
Error::RebuildJobNotFound {
child: dst_child_uri.to_owned(),
name: self.name.to_owned(),
Expand All @@ -389,9 +389,9 @@ impl<'n> Nexus<'n> {
pub(crate) fn rebuild_job_mut(
&self,
dst_child_uri: &str,
) -> Result<Arc<RebuildJob>, Error> {
) -> Result<Arc<NexusRebuildJob>, Error> {
let name = self.name.clone();
RebuildJob::lookup(dst_child_uri).map_err(|_| {
NexusRebuildJob::lookup(dst_child_uri).map_err(|_| {
Error::RebuildJobNotFound {
child: dst_child_uri.to_owned(),
name,
Expand Down
12 changes: 7 additions & 5 deletions io-engine/src/bdev/nexus/nexus_child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
VerboseError,
},
persistent_store::PersistentStore,
rebuild::{RebuildJob, RebuildMap},
rebuild::{NexusRebuildJob, RebuildMap},
};

use crate::{
Expand Down Expand Up @@ -1199,13 +1199,15 @@ impl<'c> NexusChild<'c> {
/// TODO
pub(super) fn remove_rebuild_job(
&self,
) -> Option<std::sync::Arc<RebuildJob>> {
RebuildJob::remove(&self.name).ok()
) -> Option<std::sync::Arc<NexusRebuildJob>> {
NexusRebuildJob::remove(&self.name).ok()
}

/// Return the rebuild job which is rebuilding this child, if rebuilding.
pub(crate) fn rebuild_job(&self) -> Option<std::sync::Arc<RebuildJob>> {
RebuildJob::lookup(&self.name).ok()
pub(crate) fn rebuild_job(
&self,
) -> Option<std::sync::Arc<NexusRebuildJob>> {
NexusRebuildJob::lookup(&self.name).ok()
}

/// Return the rebuild progress on this child, if rebuilding.
Expand Down
4 changes: 2 additions & 2 deletions io-engine/src/eventing/nexus_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ use crate::{
bdev::{nexus, nexus::NexusChild},
core::{MayastorEnvironment, VerboseError},
eventing::{Event, EventMetaGen, EventWithMeta},
rebuild::{RebuildJob, RebuildState},
rebuild::{NexusRebuildJob, RebuildState},
};

impl EventMetaGen for RebuildJob {
impl EventMetaGen for NexusRebuildJob {
fn meta(&self) -> EventMeta {
let rebuild_status = match self.state() {
RebuildState::Init | RebuildState::Running => {
Expand Down
6 changes: 3 additions & 3 deletions io-engine/src/grpc/v0/nexus_grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
PtplFileOps,
},
core::{Protocol, Share},
rebuild::RebuildJob,
rebuild::NexusRebuildJob,
};

fn map_fault_reason(r: FaultReason) -> ChildStateReason {
Expand Down Expand Up @@ -137,7 +137,7 @@ impl<'n> Nexus<'n> {
}
children
},
rebuilds: RebuildJob::count() as u32,
rebuilds: NexusRebuildJob::count() as u32,
allowed_hosts: self.allowed_hosts(),
}
}
Expand Down Expand Up @@ -165,7 +165,7 @@ impl<'n> Nexus<'n> {
}
children
},
rebuilds: RebuildJob::count() as u32,
rebuilds: NexusRebuildJob::count() as u32,
ana_state: ana_state as i32,
allowed_hosts: self.allowed_hosts(),
}
Expand Down
144 changes: 144 additions & 0 deletions io-engine/src/rebuild/bdev_rebuild.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
use std::{
ops::{Deref, Range},
rc::Rc,
};

use super::{
rebuild_descriptor::RebuildDescriptor,
rebuild_error::RebuildError,
rebuild_job_backend::RebuildBackend,
rebuild_task::{RebuildTasks, TaskResult},
RebuildJob,
RebuildJobOptions,
SEGMENT_TASKS,
};

use crate::gen_rebuild_instances;

/// A Bdev rebuild job is responsible for managing a rebuild (copy) which reads
/// from source_hdl and writes into destination_hdl from specified start to end.
pub struct BdevRebuildJob(RebuildJob);

impl std::fmt::Debug for BdevRebuildJob {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl Deref for BdevRebuildJob {
type Target = RebuildJob;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl BdevRebuildJob {
/// Creates a new RebuildJob which rebuilds from source URI to target URI
/// from start to end (of the data partition); notify_fn callback is called
/// when the rebuild state is updated - with the source and destination
/// bdev URI's as arguments.
pub async fn new(
src_uri: &str,
dst_uri: &str,
range: Option<Range<u64>>,
options: RebuildJobOptions,
notify_fn: fn(&str, &str) -> (),
) -> Result<Self, RebuildError> {
let descriptor =
tiagolobocastro marked this conversation as resolved.
Show resolved Hide resolved
RebuildDescriptor::new(src_uri, dst_uri, range, options).await?;
let tasks = RebuildTasks::new(SEGMENT_TASKS, &descriptor)?;
let backend =
BdevRebuildJobBackend::new(tasks, notify_fn, descriptor).await?;

RebuildJob::from_backend(backend).await.map(Self)
}
}

gen_rebuild_instances!(BdevRebuildJob);

/// A rebuild job which is responsible for rebuilding from
/// source to target of the `RebuildDescriptor`.
pub(super) struct BdevRebuildJobBackend {
/// The next block to be rebuilt.
next: u64,
/// A pool of tasks which perform the actual data rebuild.
task_pool: RebuildTasks,
/// A generic rebuild descriptor.
descriptor: Rc<RebuildDescriptor>,
/// Notification callback with src and dst uri's.
notify_fn: fn(&str, &str) -> (),
}

#[async_trait::async_trait(?Send)]
impl RebuildBackend for BdevRebuildJobBackend {
fn on_state_change(&mut self) {
(self.notify_fn)(&self.descriptor.src_uri, &self.descriptor.dst_uri);
}

fn common_desc(&self) -> &RebuildDescriptor {
&self.descriptor
}

fn task_pool(&self) -> &RebuildTasks {
&self.task_pool
}

fn schedule_task_by_id(&mut self, id: usize) -> bool {
if self.next >= self.descriptor.range.end {
false
} else {
let next = std::cmp::min(
self.next + self.descriptor.segment_size_blks,
self.descriptor.range.end,
);
self.task_pool.schedule_segment_rebuild(
id,
self.next,
self.descriptor.clone(),
);
self.task_pool.active += 1;
self.next = next;
true
}
}

async fn await_one_task(&mut self) -> Option<TaskResult> {
self.task_pool.await_one_task().await
}
}

impl std::fmt::Debug for BdevRebuildJobBackend {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BdevRebuildJob")
.field("next", &self.next)
.finish()
}
}
impl std::fmt::Display for BdevRebuildJobBackend {
fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Ok(())
}
}
dsharma-dc marked this conversation as resolved.
Show resolved Hide resolved

impl BdevRebuildJobBackend {
/// Creates a new RebuildJob which rebuilds from source URI to target URI
/// from start to end (of the data partition); notify_fn callback is called
/// when the rebuild state is updated - with the source and destination
/// URI as arguments.
pub async fn new(
task_pool: RebuildTasks,
notify_fn: fn(&str, &str) -> (),
descriptor: RebuildDescriptor,
) -> Result<Self, RebuildError> {
let be = Self {
next: descriptor.range.start,
task_pool,
descriptor: Rc::new(descriptor),
notify_fn,
};

info!("{be}: backend created");

Ok(be)
}
}
Loading
Loading