From aed89e342286c3ff3b3891d2e0a4115168ea4b11 Mon Sep 17 00:00:00 2001 From: Tiago Castro Date: Thu, 7 Dec 2023 19:37:38 +0000 Subject: [PATCH 1/4] feat(wipe/cksum): add replica cksum Makes use of the existing wipe infra to cksum a bdev. This means we keep existing bdev iteration and streaming notifications with a very simple change. todo: rename wipe and abstract wipe and hash a bit better Currently we add support for crc32c only Signed-off-by: Tiago Castro --- io-engine/src/bdev/lvs.rs | 2 +- io-engine/src/core/wiper.rs | 78 +++++++++++++++++++++++++++++++---- io-engine/src/grpc/v1/test.rs | 9 ++++ io-engine/tests/wipe.rs | 1 + spdk-rs | 2 +- utils/dependencies | 2 +- 6 files changed, 84 insertions(+), 10 deletions(-) diff --git a/io-engine/src/bdev/lvs.rs b/io-engine/src/bdev/lvs.rs index 7012a4822..1c0e83767 100644 --- a/io-engine/src/bdev/lvs.rs +++ b/io-engine/src/bdev/lvs.rs @@ -269,7 +269,7 @@ impl Lvs { name: bdev.name().into(), })?; - let wiper = crate::core::wiper::Wiper::new( + let mut wiper = crate::core::wiper::Wiper::new( hdl, crate::core::wiper::WipeMethod::WriteZeroes, ) diff --git a/io-engine/src/core/wiper.rs b/io-engine/src/core/wiper.rs index 24146bf16..9bbd91aa1 100644 --- a/io-engine/src/core/wiper.rs +++ b/io-engine/src/core/wiper.rs @@ -1,6 +1,9 @@ use crate::core::{CoreError, UntypedBdevHandle}; use snafu::Snafu; -use std::{fmt::Debug, ops::Deref}; +use std::{ + fmt::Debug, + ops::{Deref, DerefMut}, +}; /// The Error for the Wiper. #[derive(Clone, Debug, Snafu)] @@ -89,6 +92,22 @@ pub enum WipeMethod { /// When using WRITE_PATTERN, wipe using this 32bit write pattern, example: /// 0xDEADBEEF. WritePattern(u32), + /// Don't actually wipe, just take the checksum. + CkSum(CkSumMethod), +} + +/// Wipe method, allowing for some flexibility. +#[derive(Debug, Clone, Copy)] +pub enum CkSumMethod { + /// Don't actually wipe, just pretend. + Crc32 { crc32c: u32 }, +} +impl Default for CkSumMethod { + fn default() -> Self { + Self::Crc32 { + crc32c: spdk_rs::libspdk::SPDK_CRC32C_INITIAL, + } + } } /// Final Wipe stats. @@ -114,9 +133,14 @@ impl FinalWipeStats { }; tracing::warn!( - "Wiped {} => {:.3?} => {bandwidth}/s", + "Wiped {} => {:.3?} => {bandwidth}/s{}", self.stats.uuid, - elapsed + elapsed, + if let Some(crc) = stats.cksum_crc32c { + format!(" => {crc:#x}") + } else { + String::new() + } ); } } @@ -138,6 +162,11 @@ impl Deref for WipeStats { &self.stats } } +impl DerefMut for WipeStats { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.stats + } +} impl WipeStats { /// Complete the current chunk. fn complete_chunk(&mut self, start: std::time::Instant, size: u64) { @@ -159,8 +188,8 @@ impl Wiper { }) } /// Wipe the bdev at the given byte offset and byte size. - pub async fn wipe(&self, offset: u64, size: u64) -> Result<(), Error> { - match self.wipe_method { + pub async fn wipe(&mut self, offset: u64, size: u64) -> Result<(), Error> { + match &mut self.wipe_method { WipeMethod::None => Ok(()), WipeMethod::WriteZeroes => { self.bdev.write_zeroes_at(offset, size).await.map_err( @@ -174,6 +203,22 @@ impl Wiper { method: self.wipe_method, }) } + WipeMethod::CkSum(CkSumMethod::Crc32 { + crc32c, + }) => { + let mut buffer = self.bdev.dma_malloc(size).unwrap(); + self.bdev.read_at(offset, &mut buffer).await?; + + *crc32c = unsafe { + spdk_rs::libspdk::spdk_crc32c_update( + buffer.as_ptr(), + size, + *crc32c, + ) + }; + + Ok(()) + } }?; Ok(()) } @@ -188,6 +233,7 @@ impl Wiper { method: wipe_method, }) } + WipeMethod::CkSum(_) => Ok(wipe_method), } } } @@ -247,16 +293,31 @@ impl StreamedWiper { ) -> Result<(), Error> { self.wipe_with_abort(offset, size).await?; - self.stats.complete_chunk(start, size); + self.complete_chunk(start, size); self.notify() } + /// Complete the current chunk. + fn complete_chunk(&mut self, start: std::time::Instant, size: u64) { + self.stats.complete_chunk(start, size); + if let WipeMethod::CkSum(CkSumMethod::Crc32 { + crc32c, + }) = &mut self.wiper.wipe_method + { + // Finalize CRC by inverting all bits. + if self.stats.remaining_chunks == 0 { + *crc32c ^= spdk_rs::libspdk::SPDK_CRC32C_XOR; + } + self.stats.cksum_crc32c = Some(*crc32c); + } + } + /// Wipe the bdev at the given byte offset and byte size. /// Uses the abort checker allowing us to stop early if a client disconnects /// or if the process is being shutdown. async fn wipe_with_abort( - &self, + &mut self, offset: u64, size: u64, ) -> Result<(), Error> { @@ -324,6 +385,8 @@ pub(crate) struct WipeIterator { pub(crate) remaining_chunks: u64, /// Number of chunks to wipe. pub(crate) total_chunks: u64, + /// The checksum of the bdev. + pub(crate) cksum_crc32c: Option, } impl WipeIterator { fn new( @@ -366,6 +429,7 @@ impl WipeIterator { wiped_bytes: 0, remaining_chunks: chunks, total_chunks: chunks, + cksum_crc32c: None, }) } fn complete_chunk(&mut self, size: u64) { diff --git a/io-engine/src/grpc/v1/test.rs b/io-engine/src/grpc/v1/test.rs index 78db89f10..487b9a265 100644 --- a/io-engine/src/grpc/v1/test.rs +++ b/io-engine/src/grpc/v1/test.rs @@ -14,6 +14,7 @@ use io_engine_api::{ v1::test::{ wipe_options::WipeMethod, wipe_replica_request, + wipe_replica_response, StreamWipeOptions, TestRpc, WipeReplicaRequest, @@ -248,6 +249,11 @@ impl TryFrom<&Option> options.write_pattern.unwrap_or(0xdeadbeef), ) } + WipeMethod::Checksum => { + crate::core::wiper::WipeMethod::CkSum( + crate::core::wiper::CkSumMethod::default(), + ) + } }) .map_err(|error| { tonic::Status::invalid_argument(error.to_string()) @@ -271,6 +277,9 @@ impl From<&WipeStats> for WipeReplicaResponse { wiped_chunks: value.wiped_chunks, remaining_bytes: value.total_bytes - value.wiped_bytes, since: value.since.and_then(|d| TryInto::try_into(d).ok()), + checksum: value + .cksum_crc32c + .map(wipe_replica_response::Checksum::Crc32), } } } diff --git a/io-engine/tests/wipe.rs b/io-engine/tests/wipe.rs index 84e07e633..c03974701 100644 --- a/io-engine/tests/wipe.rs +++ b/io-engine/tests/wipe.rs @@ -283,6 +283,7 @@ async fn issue_wipe_replica( options: Some(v1_rpc::test::WipeOptions { wipe_method: wipe_method as i32, write_pattern: None, + cksum_alg: 0, }), chunk_size, }), diff --git a/spdk-rs b/spdk-rs index 6522dc885..e500c69e7 160000 --- a/spdk-rs +++ b/spdk-rs @@ -1 +1 @@ -Subproject commit 6522dc8858311fd206a6add2064e75855591ca4d +Subproject commit e500c69e7148dd6e495a484de1886405bc07f260 diff --git a/utils/dependencies b/utils/dependencies index 2c0e0c1e6..a8089e73d 160000 --- a/utils/dependencies +++ b/utils/dependencies @@ -1 +1 @@ -Subproject commit 2c0e0c1e65381949ab60cb5f7329305105136822 +Subproject commit a8089e73d8a7ecc3642a63d9d6c4d432a1b1ce74 From 0230be94b053b549fdcdaafc833dba7dad3c1720 Mon Sep 17 00:00:00 2001 From: Tiago Castro Date: Thu, 7 Dec 2023 19:40:37 +0000 Subject: [PATCH 2/4] feat(client/cksum): add cksum to the io-engine client Updates method argument with checksum. Adds the cksum as another column. Signed-off-by: Tiago Castro --- .../src/bin/io-engine-client/v1/test_cli.rs | 39 ++++++++++++++++++- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/io-engine/src/bin/io-engine-client/v1/test_cli.rs b/io-engine/src/bin/io-engine-client/v1/test_cli.rs index bf94b0b8a..c8484bbe7 100644 --- a/io-engine/src/bin/io-engine-client/v1/test_cli.rs +++ b/io-engine/src/bin/io-engine-client/v1/test_cli.rs @@ -106,11 +106,18 @@ impl Resource { #[derive(EnumString, EnumVariantNames)] #[strum(serialize_all = "PascalCase")] +enum CheckSumAlg { + Crc32c, +} + +#[derive(EnumString, EnumVariantNames, Clone, Copy)] +#[strum(serialize_all = "PascalCase")] enum WipeMethod { None, WriteZeroes, Unmap, WritePattern, + CheckSum, } impl WipeMethod { fn methods() -> &'static [&'static str] { @@ -124,9 +131,15 @@ impl From for v1_rpc::test::wipe_options::WipeMethod { WipeMethod::WriteZeroes => Self::WriteZeroes, WipeMethod::Unmap => Self::Unmap, WipeMethod::WritePattern => Self::WritePattern, + WipeMethod::CheckSum => Self::Checksum, } } } +impl From for v1_rpc::test::wipe_options::CheckSumAlgorithm { + fn from(_: WipeMethod) -> Self { + v1_rpc::test::wipe_options::CheckSumAlgorithm::Crc32c + } +} pub async fn handler(ctx: Context, matches: &ArgMatches) -> crate::Result<()> { match matches.subcommand().unwrap() { @@ -203,6 +216,10 @@ async fn replica_wipe( method, ) as i32, write_pattern: None, + cksum_alg: + v1_rpc::test::wipe_options::CheckSumAlgorithm::from( + method, + ) as i32, }), chunk_size: chunk_size.get_bytes() as u64, }), @@ -213,7 +230,7 @@ async fn replica_wipe( let mut resp = response.into_inner(); fn bandwidth(response: &v1_rpc::test::WipeReplicaResponse) -> String { - let unknown = "??".to_string(); + let unknown = String::new(); let Some(Ok(elapsed)) = response .since .clone() @@ -233,6 +250,18 @@ async fn replica_wipe( ) } + fn checksum(response: &v1_rpc::test::WipeReplicaResponse) -> String { + response + .checksum + .clone() + .map(|c| match c { + v1_rpc::test::wipe_replica_response::Checksum::Crc32(crc) => { + format!("{crc:#x}") + } + }) + .unwrap_or_default() + } + match ctx.output { OutputFormat::Json => { while let Some(response) = resp.next().await { @@ -257,13 +286,18 @@ async fn replica_wipe( "WIPED_CHUNKS", "REMAINING_BYTES", "BANDWIDTH", + "CHECKSUM", ]; let (s, r) = tokio::sync::mpsc::channel(10); tokio::spawn(async move { while let Some(response) = resp.next().await { let response = response.map(|response| { - let bandwidth = bandwidth(&response); + // back fill with spaces with ensure checksum aligns + // with its header + let bandwidth = + format!("{: <12}", bandwidth(&response)); + let checksum = checksum(&response); vec![ response.uuid, adjust_bytes(response.total_bytes), @@ -274,6 +308,7 @@ async fn replica_wipe( response.wiped_chunks.to_string(), adjust_bytes(response.remaining_bytes), bandwidth, + checksum, ] }); s.send(response).await.unwrap(); From 26c77c7422aa75d11e07a21037a07cc960fc2d14 Mon Sep 17 00:00:00 2001 From: Tiago Castro Date: Fri, 8 Dec 2023 16:05:10 +0000 Subject: [PATCH 3/4] feat(test/features): export test features Signed-off-by: Tiago Castro --- io-engine/src/grpc/v1/test.rs | 17 +++++++++++++++++ utils/dependencies | 2 +- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/io-engine/src/grpc/v1/test.rs b/io-engine/src/grpc/v1/test.rs index 487b9a265..2ad7f2f95 100644 --- a/io-engine/src/grpc/v1/test.rs +++ b/io-engine/src/grpc/v1/test.rs @@ -56,6 +56,23 @@ impl TestRpc for TestService { type WipeReplicaStream = ReceiverStream>; + /// Get all the features supported by the test service. + async fn get_features( + &self, + _request: Request<()>, + ) -> GrpcResult { + GrpcResult::Ok(tonic::Response::new(v1::test::TestFeatures { + wipe_methods: vec![ + v1::test::wipe_options::WipeMethod::None as i32, + v1::test::wipe_options::WipeMethod::WriteZeroes as i32, + v1::test::wipe_options::WipeMethod::Checksum as i32, + ], + cksum_algs: vec![ + v1::test::wipe_options::CheckSumAlgorithm::Crc32c as i32, + ], + })) + } + #[named] async fn wipe_replica( &self, diff --git a/utils/dependencies b/utils/dependencies index a8089e73d..8e79cb402 160000 --- a/utils/dependencies +++ b/utils/dependencies @@ -1 +1 @@ -Subproject commit a8089e73d8a7ecc3642a63d9d6c4d432a1b1ce74 +Subproject commit 8e79cb402f4c71acca1fcd0be2e1800c73edd657 From 6a025b9ccc7f249f152e600a4a8011670252465e Mon Sep 17 00:00:00 2001 From: Tiago Castro Date: Fri, 8 Dec 2023 16:06:36 +0000 Subject: [PATCH 4/4] feat(client/test): expose test features Signed-off-by: Tiago Castro --- .../src/bin/io-engine-client/v1/test_cli.rs | 22 +++++++++++++++++++ utils/dependencies | 2 +- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/io-engine/src/bin/io-engine-client/v1/test_cli.rs b/io-engine/src/bin/io-engine-client/v1/test_cli.rs index c8484bbe7..2ee317754 100644 --- a/io-engine/src/bin/io-engine-client/v1/test_cli.rs +++ b/io-engine/src/bin/io-engine-client/v1/test_cli.rs @@ -16,6 +16,8 @@ use strum_macros::{AsRefStr, EnumString, EnumVariantNames}; use tonic::Status; pub fn subcommands() -> Command { + let features = Command::new("features").about("Get the test features"); + let inject = Command::new("inject") .about("manage fault injections") .arg( @@ -89,6 +91,7 @@ pub fn subcommands() -> Command { .subcommand_required(true) .arg_required_else_help(true) .about("Test management") + .subcommand(features) .subcommand(inject) .subcommand(wipe) } @@ -144,6 +147,7 @@ impl From for v1_rpc::test::wipe_options::CheckSumAlgorithm { pub async fn handler(ctx: Context, matches: &ArgMatches) -> crate::Result<()> { match matches.subcommand().unwrap() { ("inject", args) => injections(ctx, args).await, + ("features", args) => features(ctx, args).await, ("wipe", args) => wipe(ctx, args).await, (cmd, _) => { Err(Status::not_found(format!("command {cmd} does not exist"))) @@ -152,6 +156,23 @@ pub async fn handler(ctx: Context, matches: &ArgMatches) -> crate::Result<()> { } } +async fn features( + mut ctx: Context, + _matches: &ArgMatches, +) -> crate::Result<()> { + let response = ctx.v1.test.get_features(()).await.context(GrpcStatus)?; + let features = response.into_inner(); + match ctx.output { + OutputFormat::Json => { + println!("{}", serde_json::to_string_pretty(&features).unwrap()); + } + OutputFormat::Default => { + println!("{features:#?}"); + } + } + Ok(()) +} + async fn wipe(ctx: Context, matches: &ArgMatches) -> crate::Result<()> { let resource = matches .get_one::("resource") @@ -204,6 +225,7 @@ async fn replica_wipe( ) .map_err(|s| Status::invalid_argument(format!("Bad size '{s}'"))) .context(GrpcStatus)?; + let response = ctx .v1 .test diff --git a/utils/dependencies b/utils/dependencies index 8e79cb402..dca05e39e 160000 --- a/utils/dependencies +++ b/utils/dependencies @@ -1 +1 @@ -Subproject commit 8e79cb402f4c71acca1fcd0be2e1800c73edd657 +Subproject commit dca05e39e032c15275a48e00feb0983b152338c1