diff --git a/.github/workflows/pr-submodule-branch.yml b/.github/workflows/pr-submodule-branch.yml index a5c8eb8d2..ec49ed864 100644 --- a/.github/workflows/pr-submodule-branch.yml +++ b/.github/workflows/pr-submodule-branch.yml @@ -12,6 +12,9 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 + with: + fetch-depth: 0 + submodules: recursive - name: Check root submodules branch run: | pr_branch="${{ github.event.pull_request.base.ref }}" diff --git a/io-engine/src/bdev/lvs.rs b/io-engine/src/bdev/lvs.rs index 7012a4822..1c0e83767 100644 --- a/io-engine/src/bdev/lvs.rs +++ b/io-engine/src/bdev/lvs.rs @@ -269,7 +269,7 @@ impl Lvs { name: bdev.name().into(), })?; - let wiper = crate::core::wiper::Wiper::new( + let mut wiper = crate::core::wiper::Wiper::new( hdl, crate::core::wiper::WipeMethod::WriteZeroes, ) diff --git a/io-engine/src/bin/io-engine-client/v1/test_cli.rs b/io-engine/src/bin/io-engine-client/v1/test_cli.rs index bf94b0b8a..2ee317754 100644 --- a/io-engine/src/bin/io-engine-client/v1/test_cli.rs +++ b/io-engine/src/bin/io-engine-client/v1/test_cli.rs @@ -16,6 +16,8 @@ use strum_macros::{AsRefStr, EnumString, EnumVariantNames}; use tonic::Status; pub fn subcommands() -> Command { + let features = Command::new("features").about("Get the test features"); + let inject = Command::new("inject") .about("manage fault injections") .arg( @@ -89,6 +91,7 @@ pub fn subcommands() -> Command { .subcommand_required(true) .arg_required_else_help(true) .about("Test management") + .subcommand(features) .subcommand(inject) .subcommand(wipe) } @@ -106,11 +109,18 @@ impl Resource { #[derive(EnumString, EnumVariantNames)] #[strum(serialize_all = "PascalCase")] +enum CheckSumAlg { + Crc32c, +} + +#[derive(EnumString, EnumVariantNames, Clone, Copy)] +#[strum(serialize_all = "PascalCase")] enum WipeMethod { None, WriteZeroes, Unmap, WritePattern, + CheckSum, } impl WipeMethod { fn methods() -> &'static [&'static str] { @@ -124,13 +134,20 @@ impl From for v1_rpc::test::wipe_options::WipeMethod { WipeMethod::WriteZeroes => Self::WriteZeroes, WipeMethod::Unmap => Self::Unmap, WipeMethod::WritePattern => Self::WritePattern, + WipeMethod::CheckSum => Self::Checksum, } } } +impl From for v1_rpc::test::wipe_options::CheckSumAlgorithm { + fn from(_: WipeMethod) -> Self { + v1_rpc::test::wipe_options::CheckSumAlgorithm::Crc32c + } +} pub async fn handler(ctx: Context, matches: &ArgMatches) -> crate::Result<()> { match matches.subcommand().unwrap() { ("inject", args) => injections(ctx, args).await, + ("features", args) => features(ctx, args).await, ("wipe", args) => wipe(ctx, args).await, (cmd, _) => { Err(Status::not_found(format!("command {cmd} does not exist"))) @@ -139,6 +156,23 @@ pub async fn handler(ctx: Context, matches: &ArgMatches) -> crate::Result<()> { } } +async fn features( + mut ctx: Context, + _matches: &ArgMatches, +) -> crate::Result<()> { + let response = ctx.v1.test.get_features(()).await.context(GrpcStatus)?; + let features = response.into_inner(); + match ctx.output { + OutputFormat::Json => { + println!("{}", serde_json::to_string_pretty(&features).unwrap()); + } + OutputFormat::Default => { + println!("{features:#?}"); + } + } + Ok(()) +} + async fn wipe(ctx: Context, matches: &ArgMatches) -> crate::Result<()> { let resource = matches .get_one::("resource") @@ -191,6 +225,7 @@ async fn replica_wipe( ) .map_err(|s| Status::invalid_argument(format!("Bad size '{s}'"))) .context(GrpcStatus)?; + let response = ctx .v1 .test @@ -203,6 +238,10 @@ async fn replica_wipe( method, ) as i32, write_pattern: None, + cksum_alg: + v1_rpc::test::wipe_options::CheckSumAlgorithm::from( + method, + ) as i32, }), chunk_size: chunk_size.get_bytes() as u64, }), @@ -213,7 +252,7 @@ async fn replica_wipe( let mut resp = response.into_inner(); fn bandwidth(response: &v1_rpc::test::WipeReplicaResponse) -> String { - let unknown = "??".to_string(); + let unknown = String::new(); let Some(Ok(elapsed)) = response .since .clone() @@ -233,6 +272,18 @@ async fn replica_wipe( ) } + fn checksum(response: &v1_rpc::test::WipeReplicaResponse) -> String { + response + .checksum + .clone() + .map(|c| match c { + v1_rpc::test::wipe_replica_response::Checksum::Crc32(crc) => { + format!("{crc:#x}") + } + }) + .unwrap_or_default() + } + match ctx.output { OutputFormat::Json => { while let Some(response) = resp.next().await { @@ -257,13 +308,18 @@ async fn replica_wipe( "WIPED_CHUNKS", "REMAINING_BYTES", "BANDWIDTH", + "CHECKSUM", ]; let (s, r) = tokio::sync::mpsc::channel(10); tokio::spawn(async move { while let Some(response) = resp.next().await { let response = response.map(|response| { - let bandwidth = bandwidth(&response); + // back fill with spaces with ensure checksum aligns + // with its header + let bandwidth = + format!("{: <12}", bandwidth(&response)); + let checksum = checksum(&response); vec![ response.uuid, adjust_bytes(response.total_bytes), @@ -274,6 +330,7 @@ async fn replica_wipe( response.wiped_chunks.to_string(), adjust_bytes(response.remaining_bytes), bandwidth, + checksum, ] }); s.send(response).await.unwrap(); diff --git a/io-engine/src/core/wiper.rs b/io-engine/src/core/wiper.rs index 24146bf16..9bbd91aa1 100644 --- a/io-engine/src/core/wiper.rs +++ b/io-engine/src/core/wiper.rs @@ -1,6 +1,9 @@ use crate::core::{CoreError, UntypedBdevHandle}; use snafu::Snafu; -use std::{fmt::Debug, ops::Deref}; +use std::{ + fmt::Debug, + ops::{Deref, DerefMut}, +}; /// The Error for the Wiper. #[derive(Clone, Debug, Snafu)] @@ -89,6 +92,22 @@ pub enum WipeMethod { /// When using WRITE_PATTERN, wipe using this 32bit write pattern, example: /// 0xDEADBEEF. WritePattern(u32), + /// Don't actually wipe, just take the checksum. + CkSum(CkSumMethod), +} + +/// Wipe method, allowing for some flexibility. +#[derive(Debug, Clone, Copy)] +pub enum CkSumMethod { + /// Don't actually wipe, just pretend. + Crc32 { crc32c: u32 }, +} +impl Default for CkSumMethod { + fn default() -> Self { + Self::Crc32 { + crc32c: spdk_rs::libspdk::SPDK_CRC32C_INITIAL, + } + } } /// Final Wipe stats. @@ -114,9 +133,14 @@ impl FinalWipeStats { }; tracing::warn!( - "Wiped {} => {:.3?} => {bandwidth}/s", + "Wiped {} => {:.3?} => {bandwidth}/s{}", self.stats.uuid, - elapsed + elapsed, + if let Some(crc) = stats.cksum_crc32c { + format!(" => {crc:#x}") + } else { + String::new() + } ); } } @@ -138,6 +162,11 @@ impl Deref for WipeStats { &self.stats } } +impl DerefMut for WipeStats { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.stats + } +} impl WipeStats { /// Complete the current chunk. fn complete_chunk(&mut self, start: std::time::Instant, size: u64) { @@ -159,8 +188,8 @@ impl Wiper { }) } /// Wipe the bdev at the given byte offset and byte size. - pub async fn wipe(&self, offset: u64, size: u64) -> Result<(), Error> { - match self.wipe_method { + pub async fn wipe(&mut self, offset: u64, size: u64) -> Result<(), Error> { + match &mut self.wipe_method { WipeMethod::None => Ok(()), WipeMethod::WriteZeroes => { self.bdev.write_zeroes_at(offset, size).await.map_err( @@ -174,6 +203,22 @@ impl Wiper { method: self.wipe_method, }) } + WipeMethod::CkSum(CkSumMethod::Crc32 { + crc32c, + }) => { + let mut buffer = self.bdev.dma_malloc(size).unwrap(); + self.bdev.read_at(offset, &mut buffer).await?; + + *crc32c = unsafe { + spdk_rs::libspdk::spdk_crc32c_update( + buffer.as_ptr(), + size, + *crc32c, + ) + }; + + Ok(()) + } }?; Ok(()) } @@ -188,6 +233,7 @@ impl Wiper { method: wipe_method, }) } + WipeMethod::CkSum(_) => Ok(wipe_method), } } } @@ -247,16 +293,31 @@ impl StreamedWiper { ) -> Result<(), Error> { self.wipe_with_abort(offset, size).await?; - self.stats.complete_chunk(start, size); + self.complete_chunk(start, size); self.notify() } + /// Complete the current chunk. + fn complete_chunk(&mut self, start: std::time::Instant, size: u64) { + self.stats.complete_chunk(start, size); + if let WipeMethod::CkSum(CkSumMethod::Crc32 { + crc32c, + }) = &mut self.wiper.wipe_method + { + // Finalize CRC by inverting all bits. + if self.stats.remaining_chunks == 0 { + *crc32c ^= spdk_rs::libspdk::SPDK_CRC32C_XOR; + } + self.stats.cksum_crc32c = Some(*crc32c); + } + } + /// Wipe the bdev at the given byte offset and byte size. /// Uses the abort checker allowing us to stop early if a client disconnects /// or if the process is being shutdown. async fn wipe_with_abort( - &self, + &mut self, offset: u64, size: u64, ) -> Result<(), Error> { @@ -324,6 +385,8 @@ pub(crate) struct WipeIterator { pub(crate) remaining_chunks: u64, /// Number of chunks to wipe. pub(crate) total_chunks: u64, + /// The checksum of the bdev. + pub(crate) cksum_crc32c: Option, } impl WipeIterator { fn new( @@ -366,6 +429,7 @@ impl WipeIterator { wiped_bytes: 0, remaining_chunks: chunks, total_chunks: chunks, + cksum_crc32c: None, }) } fn complete_chunk(&mut self, size: u64) { diff --git a/io-engine/src/eventing/host_events.rs b/io-engine/src/eventing/host_events.rs index e0b548b78..76d4b37ed 100644 --- a/io-engine/src/eventing/host_events.rs +++ b/io-engine/src/eventing/host_events.rs @@ -7,18 +7,48 @@ use events_api::event::{ }; use crate::{ - core::MayastorEnvironment, + bdev::Nexus, + core::{LogicalVolume, MayastorEnvironment}, eventing::{EventMetaGen, EventWithMeta}, + lvs::Lvol, subsys::NvmfSubsystem, }; use spdk_rs::NvmfController; +/// A trait definition to include target details in host events meta data +pub(crate) trait HostTargetMeta { + /// Add target detaails to host event meta + fn host_target_meta(&self, meta: EventMeta) -> EventMeta; +} + +impl<'n> HostTargetMeta for Nexus<'n> { + fn host_target_meta(&self, mut meta: EventMeta) -> EventMeta { + if let Some(source) = meta.source { + let event_source = + source.with_target_data("nexus", &self.uuid().to_string()); + meta.source = Some(event_source); + } + meta + } +} + +impl HostTargetMeta for Lvol { + fn host_target_meta(&self, mut meta: EventMeta) -> EventMeta { + if let Some(source) = meta.source { + let event_source = source.with_target_data("replica", &self.uuid()); + meta.source = Some(event_source); + } + meta + } +} + impl EventMetaGen for NvmfSubsystem { fn meta(&self) -> EventMeta { + let nqn = self.get_nqn(); let event_source = EventSource::new( MayastorEnvironment::global_or_default().node_name, ) - .with_subsystem_data(&self.get_nqn()); + .with_subsystem_data(&nqn); EventMeta::from_source(event_source) } diff --git a/io-engine/src/eventing/mod.rs b/io-engine/src/eventing/mod.rs index 3c823a5dc..de8cd5ab2 100644 --- a/io-engine/src/eventing/mod.rs +++ b/io-engine/src/eventing/mod.rs @@ -1,4 +1,4 @@ -mod host_events; +pub(crate) mod host_events; mod nexus_child_events; pub(crate) mod nexus_events; mod pool_events; diff --git a/io-engine/src/grpc/v1/test.rs b/io-engine/src/grpc/v1/test.rs index 78db89f10..2ad7f2f95 100644 --- a/io-engine/src/grpc/v1/test.rs +++ b/io-engine/src/grpc/v1/test.rs @@ -14,6 +14,7 @@ use io_engine_api::{ v1::test::{ wipe_options::WipeMethod, wipe_replica_request, + wipe_replica_response, StreamWipeOptions, TestRpc, WipeReplicaRequest, @@ -55,6 +56,23 @@ impl TestRpc for TestService { type WipeReplicaStream = ReceiverStream>; + /// Get all the features supported by the test service. + async fn get_features( + &self, + _request: Request<()>, + ) -> GrpcResult { + GrpcResult::Ok(tonic::Response::new(v1::test::TestFeatures { + wipe_methods: vec![ + v1::test::wipe_options::WipeMethod::None as i32, + v1::test::wipe_options::WipeMethod::WriteZeroes as i32, + v1::test::wipe_options::WipeMethod::Checksum as i32, + ], + cksum_algs: vec![ + v1::test::wipe_options::CheckSumAlgorithm::Crc32c as i32, + ], + })) + } + #[named] async fn wipe_replica( &self, @@ -248,6 +266,11 @@ impl TryFrom<&Option> options.write_pattern.unwrap_or(0xdeadbeef), ) } + WipeMethod::Checksum => { + crate::core::wiper::WipeMethod::CkSum( + crate::core::wiper::CkSumMethod::default(), + ) + } }) .map_err(|error| { tonic::Status::invalid_argument(error.to_string()) @@ -271,6 +294,9 @@ impl From<&WipeStats> for WipeReplicaResponse { wiped_chunks: value.wiped_chunks, remaining_bytes: value.total_bytes - value.wiped_bytes, since: value.since.and_then(|d| TryInto::try_into(d).ok()), + checksum: value + .cksum_crc32c + .map(wipe_replica_response::Checksum::Crc32), } } } diff --git a/io-engine/src/subsys/nvmf/subsystem.rs b/io-engine/src/subsys/nvmf/subsystem.rs index e716dfd4c..e913580b2 100644 --- a/io-engine/src/subsys/nvmf/subsystem.rs +++ b/io-engine/src/subsys/nvmf/subsystem.rs @@ -64,7 +64,7 @@ use crate::{ bdev::{nexus::NEXUS_MODULE_NAME, nvmx::NVME_CONTROLLERS, Nexus}, constants::{NVME_CONTROLLER_MODEL_ID, NVME_NQN_PREFIX}, core::{Bdev, Reactors, UntypedBdev}, - eventing::{EventMetaGen, EventWithMeta}, + eventing::{host_events::HostTargetMeta, EventMetaGen, EventWithMeta}, ffihelper::{cb_arg, done_cb, AsStr, FfiResult, IntoCString}, lvs::Lvol, subsys::{ @@ -73,7 +73,6 @@ use crate::{ Config, }, }; - use events_api::event::EventAction; /// TODO @@ -256,9 +255,15 @@ impl NvmfSubsystem { ); } + let event_meta = match nqn_tgt { + NqnTarget::Nexus(n) => n.host_target_meta(s.meta()), + NqnTarget::Replica(ref r) => r.host_target_meta(s.meta()), + NqnTarget::None => s.meta(), + }; + match event { NvmfSubsystemEvent::HostConnect(c) => { - c.event(EventAction::NvmeConnect, s.meta()).generate(); + c.event(EventAction::NvmeConnect, event_meta).generate(); match nqn_tgt { NqnTarget::Nexus(n) => s.host_connect_nexus(c, n), @@ -267,7 +272,7 @@ impl NvmfSubsystem { } } NvmfSubsystemEvent::HostDisconnect(c) => { - c.event(EventAction::NvmeDisconnect, s.meta()).generate(); + c.event(EventAction::NvmeDisconnect, event_meta).generate(); match nqn_tgt { NqnTarget::Nexus(n) => s.host_disconnect_nexus(c, n), @@ -276,7 +281,7 @@ impl NvmfSubsystem { } } NvmfSubsystemEvent::HostKeepAliveTimeout(c) => { - c.event(EventAction::NvmeKeepAliveTimeout, s.meta()) + c.event(EventAction::NvmeKeepAliveTimeout, event_meta) .generate(); match nqn_tgt { diff --git a/io-engine/tests/wipe.rs b/io-engine/tests/wipe.rs index 84e07e633..c03974701 100644 --- a/io-engine/tests/wipe.rs +++ b/io-engine/tests/wipe.rs @@ -283,6 +283,7 @@ async fn issue_wipe_replica( options: Some(v1_rpc::test::WipeOptions { wipe_method: wipe_method as i32, write_pattern: None, + cksum_alg: 0, }), chunk_size, }), diff --git a/spdk-rs b/spdk-rs index 6522dc885..e500c69e7 160000 --- a/spdk-rs +++ b/spdk-rs @@ -1 +1 @@ -Subproject commit 6522dc8858311fd206a6add2064e75855591ca4d +Subproject commit e500c69e7148dd6e495a484de1886405bc07f260 diff --git a/utils/dependencies b/utils/dependencies index b591fb073..36fa59876 160000 --- a/utils/dependencies +++ b/utils/dependencies @@ -1 +1 @@ -Subproject commit b591fb07351f3600a0189c3ee817858d0e11ae8f +Subproject commit 36fa598760e0528ee9561431552a1b571238e935