Skip to content

Commit

Permalink
Merge #1558
Browse files Browse the repository at this point in the history
1558: Cherry-pick #1556 and #1557 r=tiagolobocastro a=tiagolobocastro

    ci(gha): fix submodule check
    
    Ensures we fetch git history and submodules.
    
    Signed-off-by: Tiago Castro <[email protected]>

---

1557: Feat: Cksum a replica r=tiagolobocastro a=tiagolobocastro

    feat(client/test): expose test features
    
    Signed-off-by: Tiago Castro <[email protected]>

---

    feat(test/features): export test features
    
    Signed-off-by: Tiago Castro <[email protected]>

---

    feat(client/cksum): add cksum to the io-engine client
    
    Updates method argument with checksum.
    Adds the cksum as another column.
    
    Signed-off-by: Tiago Castro <[email protected]>

---

    feat(wipe/cksum): add replica cksum
    
    Makes use of the existing wipe infra to cksum a bdev.
    This means we keep existing bdev iteration and streaming notifications
    with a very simple change.
    todo: rename wipe and abstract wipe and hash a bit better
    
    Currently we add support for crc32c only
    
    Signed-off-by: Tiago Castro <[email protected]>

1556: feat(eventing): host event specific details r=datacore-vvarakantham a=datacore-vvarakantham

Added host event specific details.

Co-authored-by: Vandana Varakantham <[email protected]>
Co-authored-by: Tiago Castro <[email protected]>
  • Loading branch information
3 people committed Dec 8, 2023
2 parents 1ea7f8c + ef48351 commit 5ccb082
Show file tree
Hide file tree
Showing 11 changed files with 206 additions and 20 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/pr-submodule-branch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
submodules: recursive
- name: Check root submodules branch
run: |
pr_branch="${{ github.event.pull_request.base.ref }}"
Expand Down
2 changes: 1 addition & 1 deletion io-engine/src/bdev/lvs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ impl Lvs {
name: bdev.name().into(),
})?;

let wiper = crate::core::wiper::Wiper::new(
let mut wiper = crate::core::wiper::Wiper::new(
hdl,
crate::core::wiper::WipeMethod::WriteZeroes,
)
Expand Down
61 changes: 59 additions & 2 deletions io-engine/src/bin/io-engine-client/v1/test_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use strum_macros::{AsRefStr, EnumString, EnumVariantNames};
use tonic::Status;

pub fn subcommands() -> Command {
let features = Command::new("features").about("Get the test features");

let inject = Command::new("inject")
.about("manage fault injections")
.arg(
Expand Down Expand Up @@ -89,6 +91,7 @@ pub fn subcommands() -> Command {
.subcommand_required(true)
.arg_required_else_help(true)
.about("Test management")
.subcommand(features)
.subcommand(inject)
.subcommand(wipe)
}
Expand All @@ -106,11 +109,18 @@ impl Resource {

#[derive(EnumString, EnumVariantNames)]
#[strum(serialize_all = "PascalCase")]
enum CheckSumAlg {
Crc32c,
}

#[derive(EnumString, EnumVariantNames, Clone, Copy)]
#[strum(serialize_all = "PascalCase")]
enum WipeMethod {
None,
WriteZeroes,
Unmap,
WritePattern,
CheckSum,
}
impl WipeMethod {
fn methods() -> &'static [&'static str] {
Expand All @@ -124,13 +134,20 @@ impl From<WipeMethod> for v1_rpc::test::wipe_options::WipeMethod {
WipeMethod::WriteZeroes => Self::WriteZeroes,
WipeMethod::Unmap => Self::Unmap,
WipeMethod::WritePattern => Self::WritePattern,
WipeMethod::CheckSum => Self::Checksum,
}
}
}
impl From<WipeMethod> for v1_rpc::test::wipe_options::CheckSumAlgorithm {
fn from(_: WipeMethod) -> Self {
v1_rpc::test::wipe_options::CheckSumAlgorithm::Crc32c
}
}

pub async fn handler(ctx: Context, matches: &ArgMatches) -> crate::Result<()> {
match matches.subcommand().unwrap() {
("inject", args) => injections(ctx, args).await,
("features", args) => features(ctx, args).await,
("wipe", args) => wipe(ctx, args).await,
(cmd, _) => {
Err(Status::not_found(format!("command {cmd} does not exist")))
Expand All @@ -139,6 +156,23 @@ pub async fn handler(ctx: Context, matches: &ArgMatches) -> crate::Result<()> {
}
}

async fn features(
mut ctx: Context,
_matches: &ArgMatches,
) -> crate::Result<()> {
let response = ctx.v1.test.get_features(()).await.context(GrpcStatus)?;
let features = response.into_inner();
match ctx.output {
OutputFormat::Json => {
println!("{}", serde_json::to_string_pretty(&features).unwrap());
}
OutputFormat::Default => {
println!("{features:#?}");
}
}
Ok(())
}

async fn wipe(ctx: Context, matches: &ArgMatches) -> crate::Result<()> {
let resource = matches
.get_one::<String>("resource")
Expand Down Expand Up @@ -191,6 +225,7 @@ async fn replica_wipe(
)
.map_err(|s| Status::invalid_argument(format!("Bad size '{s}'")))
.context(GrpcStatus)?;

let response = ctx
.v1
.test
Expand All @@ -203,6 +238,10 @@ async fn replica_wipe(
method,
) as i32,
write_pattern: None,
cksum_alg:
v1_rpc::test::wipe_options::CheckSumAlgorithm::from(
method,
) as i32,
}),
chunk_size: chunk_size.get_bytes() as u64,
}),
Expand All @@ -213,7 +252,7 @@ async fn replica_wipe(
let mut resp = response.into_inner();

fn bandwidth(response: &v1_rpc::test::WipeReplicaResponse) -> String {
let unknown = "??".to_string();
let unknown = String::new();
let Some(Ok(elapsed)) = response
.since
.clone()
Expand All @@ -233,6 +272,18 @@ async fn replica_wipe(
)
}

fn checksum(response: &v1_rpc::test::WipeReplicaResponse) -> String {
response
.checksum
.clone()
.map(|c| match c {
v1_rpc::test::wipe_replica_response::Checksum::Crc32(crc) => {
format!("{crc:#x}")
}
})
.unwrap_or_default()
}

match ctx.output {
OutputFormat::Json => {
while let Some(response) = resp.next().await {
Expand All @@ -257,13 +308,18 @@ async fn replica_wipe(
"WIPED_CHUNKS",
"REMAINING_BYTES",
"BANDWIDTH",
"CHECKSUM",
];

let (s, r) = tokio::sync::mpsc::channel(10);
tokio::spawn(async move {
while let Some(response) = resp.next().await {
let response = response.map(|response| {
let bandwidth = bandwidth(&response);
// back fill with spaces with ensure checksum aligns
// with its header
let bandwidth =
format!("{: <12}", bandwidth(&response));
let checksum = checksum(&response);
vec![
response.uuid,
adjust_bytes(response.total_bytes),
Expand All @@ -274,6 +330,7 @@ async fn replica_wipe(
response.wiped_chunks.to_string(),
adjust_bytes(response.remaining_bytes),
bandwidth,
checksum,
]
});
s.send(response).await.unwrap();
Expand Down
78 changes: 71 additions & 7 deletions io-engine/src/core/wiper.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::core::{CoreError, UntypedBdevHandle};
use snafu::Snafu;
use std::{fmt::Debug, ops::Deref};
use std::{
fmt::Debug,
ops::{Deref, DerefMut},
};

/// The Error for the Wiper.
#[derive(Clone, Debug, Snafu)]
Expand Down Expand Up @@ -89,6 +92,22 @@ pub enum WipeMethod {
/// When using WRITE_PATTERN, wipe using this 32bit write pattern, example:
/// 0xDEADBEEF.
WritePattern(u32),
/// Don't actually wipe, just take the checksum.
CkSum(CkSumMethod),
}

/// Wipe method, allowing for some flexibility.
#[derive(Debug, Clone, Copy)]
pub enum CkSumMethod {
/// Don't actually wipe, just pretend.
Crc32 { crc32c: u32 },
}
impl Default for CkSumMethod {
fn default() -> Self {
Self::Crc32 {
crc32c: spdk_rs::libspdk::SPDK_CRC32C_INITIAL,
}
}
}

/// Final Wipe stats.
Expand All @@ -114,9 +133,14 @@ impl FinalWipeStats {
};

tracing::warn!(
"Wiped {} => {:.3?} => {bandwidth}/s",
"Wiped {} => {:.3?} => {bandwidth}/s{}",
self.stats.uuid,
elapsed
elapsed,
if let Some(crc) = stats.cksum_crc32c {
format!(" => {crc:#x}")
} else {
String::new()
}
);
}
}
Expand All @@ -138,6 +162,11 @@ impl Deref for WipeStats {
&self.stats
}
}
impl DerefMut for WipeStats {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.stats
}
}
impl WipeStats {
/// Complete the current chunk.
fn complete_chunk(&mut self, start: std::time::Instant, size: u64) {
Expand All @@ -159,8 +188,8 @@ impl Wiper {
})
}
/// Wipe the bdev at the given byte offset and byte size.
pub async fn wipe(&self, offset: u64, size: u64) -> Result<(), Error> {
match self.wipe_method {
pub async fn wipe(&mut self, offset: u64, size: u64) -> Result<(), Error> {
match &mut self.wipe_method {
WipeMethod::None => Ok(()),
WipeMethod::WriteZeroes => {
self.bdev.write_zeroes_at(offset, size).await.map_err(
Expand All @@ -174,6 +203,22 @@ impl Wiper {
method: self.wipe_method,
})
}
WipeMethod::CkSum(CkSumMethod::Crc32 {
crc32c,
}) => {
let mut buffer = self.bdev.dma_malloc(size).unwrap();
self.bdev.read_at(offset, &mut buffer).await?;

*crc32c = unsafe {
spdk_rs::libspdk::spdk_crc32c_update(
buffer.as_ptr(),
size,
*crc32c,
)
};

Ok(())
}
}?;
Ok(())
}
Expand All @@ -188,6 +233,7 @@ impl Wiper {
method: wipe_method,
})
}
WipeMethod::CkSum(_) => Ok(wipe_method),
}
}
}
Expand Down Expand Up @@ -247,16 +293,31 @@ impl<S: NotifyStream> StreamedWiper<S> {
) -> Result<(), Error> {
self.wipe_with_abort(offset, size).await?;

self.stats.complete_chunk(start, size);
self.complete_chunk(start, size);

self.notify()
}

/// Complete the current chunk.
fn complete_chunk(&mut self, start: std::time::Instant, size: u64) {
self.stats.complete_chunk(start, size);
if let WipeMethod::CkSum(CkSumMethod::Crc32 {
crc32c,
}) = &mut self.wiper.wipe_method
{
// Finalize CRC by inverting all bits.
if self.stats.remaining_chunks == 0 {
*crc32c ^= spdk_rs::libspdk::SPDK_CRC32C_XOR;
}
self.stats.cksum_crc32c = Some(*crc32c);
}
}

/// Wipe the bdev at the given byte offset and byte size.
/// Uses the abort checker allowing us to stop early if a client disconnects
/// or if the process is being shutdown.
async fn wipe_with_abort(
&self,
&mut self,
offset: u64,
size: u64,
) -> Result<(), Error> {
Expand Down Expand Up @@ -324,6 +385,8 @@ pub(crate) struct WipeIterator {
pub(crate) remaining_chunks: u64,
/// Number of chunks to wipe.
pub(crate) total_chunks: u64,
/// The checksum of the bdev.
pub(crate) cksum_crc32c: Option<u32>,
}
impl WipeIterator {
fn new(
Expand Down Expand Up @@ -366,6 +429,7 @@ impl WipeIterator {
wiped_bytes: 0,
remaining_chunks: chunks,
total_chunks: chunks,
cksum_crc32c: None,
})
}
fn complete_chunk(&mut self, size: u64) {
Expand Down
34 changes: 32 additions & 2 deletions io-engine/src/eventing/host_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,48 @@ use events_api::event::{
};

use crate::{
core::MayastorEnvironment,
bdev::Nexus,
core::{LogicalVolume, MayastorEnvironment},
eventing::{EventMetaGen, EventWithMeta},
lvs::Lvol,
subsys::NvmfSubsystem,
};
use spdk_rs::NvmfController;

/// A trait definition to include target details in host events meta data
pub(crate) trait HostTargetMeta {
/// Add target detaails to host event meta
fn host_target_meta(&self, meta: EventMeta) -> EventMeta;
}

impl<'n> HostTargetMeta for Nexus<'n> {
fn host_target_meta(&self, mut meta: EventMeta) -> EventMeta {
if let Some(source) = meta.source {
let event_source =
source.with_target_data("nexus", &self.uuid().to_string());
meta.source = Some(event_source);
}
meta
}
}

impl HostTargetMeta for Lvol {
fn host_target_meta(&self, mut meta: EventMeta) -> EventMeta {
if let Some(source) = meta.source {
let event_source = source.with_target_data("replica", &self.uuid());
meta.source = Some(event_source);
}
meta
}
}

impl EventMetaGen for NvmfSubsystem {
fn meta(&self) -> EventMeta {
let nqn = self.get_nqn();
let event_source = EventSource::new(
MayastorEnvironment::global_or_default().node_name,
)
.with_subsystem_data(&self.get_nqn());
.with_subsystem_data(&nqn);

EventMeta::from_source(event_source)
}
Expand Down
2 changes: 1 addition & 1 deletion io-engine/src/eventing/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
mod host_events;
pub(crate) mod host_events;
mod nexus_child_events;
pub(crate) mod nexus_events;
mod pool_events;
Expand Down
Loading

0 comments on commit 5ccb082

Please sign in to comment.