Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Cksum a replica #1557

Merged
merged 4 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion io-engine/src/bdev/lvs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ impl Lvs {
name: bdev.name().into(),
})?;

let wiper = crate::core::wiper::Wiper::new(
let mut wiper = crate::core::wiper::Wiper::new(
hdl,
crate::core::wiper::WipeMethod::WriteZeroes,
)
Expand Down
61 changes: 59 additions & 2 deletions io-engine/src/bin/io-engine-client/v1/test_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use strum_macros::{AsRefStr, EnumString, EnumVariantNames};
use tonic::Status;

pub fn subcommands() -> Command {
let features = Command::new("features").about("Get the test features");

let inject = Command::new("inject")
.about("manage fault injections")
.arg(
Expand Down Expand Up @@ -89,6 +91,7 @@ pub fn subcommands() -> Command {
.subcommand_required(true)
.arg_required_else_help(true)
.about("Test management")
.subcommand(features)
.subcommand(inject)
.subcommand(wipe)
}
Expand All @@ -106,11 +109,18 @@ impl Resource {

#[derive(EnumString, EnumVariantNames)]
#[strum(serialize_all = "PascalCase")]
enum CheckSumAlg {
Crc32c,
}

#[derive(EnumString, EnumVariantNames, Clone, Copy)]
#[strum(serialize_all = "PascalCase")]
enum WipeMethod {
None,
WriteZeroes,
Unmap,
WritePattern,
CheckSum,
}
impl WipeMethod {
fn methods() -> &'static [&'static str] {
Expand All @@ -124,13 +134,20 @@ impl From<WipeMethod> for v1_rpc::test::wipe_options::WipeMethod {
WipeMethod::WriteZeroes => Self::WriteZeroes,
WipeMethod::Unmap => Self::Unmap,
WipeMethod::WritePattern => Self::WritePattern,
WipeMethod::CheckSum => Self::Checksum,
}
}
}
impl From<WipeMethod> for v1_rpc::test::wipe_options::CheckSumAlgorithm {
fn from(_: WipeMethod) -> Self {
v1_rpc::test::wipe_options::CheckSumAlgorithm::Crc32c
}
}

pub async fn handler(ctx: Context, matches: &ArgMatches) -> crate::Result<()> {
match matches.subcommand().unwrap() {
("inject", args) => injections(ctx, args).await,
("features", args) => features(ctx, args).await,
("wipe", args) => wipe(ctx, args).await,
(cmd, _) => {
Err(Status::not_found(format!("command {cmd} does not exist")))
Expand All @@ -139,6 +156,23 @@ pub async fn handler(ctx: Context, matches: &ArgMatches) -> crate::Result<()> {
}
}

async fn features(
mut ctx: Context,
_matches: &ArgMatches,
) -> crate::Result<()> {
let response = ctx.v1.test.get_features(()).await.context(GrpcStatus)?;
let features = response.into_inner();
match ctx.output {
OutputFormat::Json => {
println!("{}", serde_json::to_string_pretty(&features).unwrap());
}
OutputFormat::Default => {
println!("{features:#?}");
}
}
Ok(())
}

async fn wipe(ctx: Context, matches: &ArgMatches) -> crate::Result<()> {
let resource = matches
.get_one::<String>("resource")
Expand Down Expand Up @@ -191,6 +225,7 @@ async fn replica_wipe(
)
.map_err(|s| Status::invalid_argument(format!("Bad size '{s}'")))
.context(GrpcStatus)?;

let response = ctx
.v1
.test
Expand All @@ -203,6 +238,10 @@ async fn replica_wipe(
method,
) as i32,
write_pattern: None,
cksum_alg:
v1_rpc::test::wipe_options::CheckSumAlgorithm::from(
method,
) as i32,
}),
chunk_size: chunk_size.get_bytes() as u64,
}),
Expand All @@ -213,7 +252,7 @@ async fn replica_wipe(
let mut resp = response.into_inner();

fn bandwidth(response: &v1_rpc::test::WipeReplicaResponse) -> String {
let unknown = "??".to_string();
let unknown = String::new();
let Some(Ok(elapsed)) = response
.since
.clone()
Expand All @@ -233,6 +272,18 @@ async fn replica_wipe(
)
}

fn checksum(response: &v1_rpc::test::WipeReplicaResponse) -> String {
response
.checksum
.clone()
.map(|c| match c {
v1_rpc::test::wipe_replica_response::Checksum::Crc32(crc) => {
format!("{crc:#x}")
}
})
.unwrap_or_default()
}

match ctx.output {
OutputFormat::Json => {
while let Some(response) = resp.next().await {
Expand All @@ -257,13 +308,18 @@ async fn replica_wipe(
"WIPED_CHUNKS",
"REMAINING_BYTES",
"BANDWIDTH",
"CHECKSUM",
];

let (s, r) = tokio::sync::mpsc::channel(10);
tokio::spawn(async move {
while let Some(response) = resp.next().await {
let response = response.map(|response| {
let bandwidth = bandwidth(&response);
// back fill with spaces with ensure checksum aligns
// with its header
let bandwidth =
format!("{: <12}", bandwidth(&response));
let checksum = checksum(&response);
vec![
response.uuid,
adjust_bytes(response.total_bytes),
Expand All @@ -274,6 +330,7 @@ async fn replica_wipe(
response.wiped_chunks.to_string(),
adjust_bytes(response.remaining_bytes),
bandwidth,
checksum,
]
});
s.send(response).await.unwrap();
Expand Down
78 changes: 71 additions & 7 deletions io-engine/src/core/wiper.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::core::{CoreError, UntypedBdevHandle};
use snafu::Snafu;
use std::{fmt::Debug, ops::Deref};
use std::{
fmt::Debug,
ops::{Deref, DerefMut},
};

/// The Error for the Wiper.
#[derive(Clone, Debug, Snafu)]
Expand Down Expand Up @@ -89,6 +92,22 @@ pub enum WipeMethod {
/// When using WRITE_PATTERN, wipe using this 32bit write pattern, example:
/// 0xDEADBEEF.
WritePattern(u32),
/// Don't actually wipe, just take the checksum.
CkSum(CkSumMethod),
}

/// Wipe method, allowing for some flexibility.
#[derive(Debug, Clone, Copy)]
pub enum CkSumMethod {
/// Don't actually wipe, just pretend.
Crc32 { crc32c: u32 },
}
impl Default for CkSumMethod {
fn default() -> Self {
Self::Crc32 {
crc32c: spdk_rs::libspdk::SPDK_CRC32C_INITIAL,
}
}
}

/// Final Wipe stats.
Expand All @@ -114,9 +133,14 @@ impl FinalWipeStats {
};

tracing::warn!(
"Wiped {} => {:.3?} => {bandwidth}/s",
"Wiped {} => {:.3?} => {bandwidth}/s{}",
self.stats.uuid,
elapsed
elapsed,
if let Some(crc) = stats.cksum_crc32c {
format!(" => {crc:#x}")
} else {
String::new()
}
);
}
}
Expand All @@ -138,6 +162,11 @@ impl Deref for WipeStats {
&self.stats
}
}
impl DerefMut for WipeStats {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.stats
}
}
impl WipeStats {
/// Complete the current chunk.
fn complete_chunk(&mut self, start: std::time::Instant, size: u64) {
Expand All @@ -159,8 +188,8 @@ impl Wiper {
})
}
/// Wipe the bdev at the given byte offset and byte size.
pub async fn wipe(&self, offset: u64, size: u64) -> Result<(), Error> {
match self.wipe_method {
pub async fn wipe(&mut self, offset: u64, size: u64) -> Result<(), Error> {
match &mut self.wipe_method {
WipeMethod::None => Ok(()),
WipeMethod::WriteZeroes => {
self.bdev.write_zeroes_at(offset, size).await.map_err(
Expand All @@ -174,6 +203,22 @@ impl Wiper {
method: self.wipe_method,
})
}
WipeMethod::CkSum(CkSumMethod::Crc32 {
crc32c,
}) => {
let mut buffer = self.bdev.dma_malloc(size).unwrap();
self.bdev.read_at(offset, &mut buffer).await?;

*crc32c = unsafe {
spdk_rs::libspdk::spdk_crc32c_update(
buffer.as_ptr(),
size,
*crc32c,
)
};

Ok(())
}
}?;
Ok(())
}
Expand All @@ -188,6 +233,7 @@ impl Wiper {
method: wipe_method,
})
}
WipeMethod::CkSum(_) => Ok(wipe_method),
}
}
}
Expand Down Expand Up @@ -247,16 +293,31 @@ impl<S: NotifyStream> StreamedWiper<S> {
) -> Result<(), Error> {
self.wipe_with_abort(offset, size).await?;

self.stats.complete_chunk(start, size);
self.complete_chunk(start, size);

self.notify()
}

/// Complete the current chunk.
fn complete_chunk(&mut self, start: std::time::Instant, size: u64) {
self.stats.complete_chunk(start, size);
if let WipeMethod::CkSum(CkSumMethod::Crc32 {
crc32c,
}) = &mut self.wiper.wipe_method
{
// Finalize CRC by inverting all bits.
if self.stats.remaining_chunks == 0 {
*crc32c ^= spdk_rs::libspdk::SPDK_CRC32C_XOR;
}
self.stats.cksum_crc32c = Some(*crc32c);
}
}

/// Wipe the bdev at the given byte offset and byte size.
/// Uses the abort checker allowing us to stop early if a client disconnects
/// or if the process is being shutdown.
async fn wipe_with_abort(
&self,
&mut self,
offset: u64,
size: u64,
) -> Result<(), Error> {
Expand Down Expand Up @@ -324,6 +385,8 @@ pub(crate) struct WipeIterator {
pub(crate) remaining_chunks: u64,
/// Number of chunks to wipe.
pub(crate) total_chunks: u64,
/// The checksum of the bdev.
pub(crate) cksum_crc32c: Option<u32>,
}
impl WipeIterator {
fn new(
Expand Down Expand Up @@ -366,6 +429,7 @@ impl WipeIterator {
wiped_bytes: 0,
remaining_chunks: chunks,
total_chunks: chunks,
cksum_crc32c: None,
})
}
fn complete_chunk(&mut self, size: u64) {
Expand Down
26 changes: 26 additions & 0 deletions io-engine/src/grpc/v1/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use io_engine_api::{
v1::test::{
wipe_options::WipeMethod,
wipe_replica_request,
wipe_replica_response,
StreamWipeOptions,
TestRpc,
WipeReplicaRequest,
Expand Down Expand Up @@ -55,6 +56,23 @@ impl TestRpc for TestService {
type WipeReplicaStream =
ReceiverStream<Result<WipeReplicaResponse, Status>>;

/// Get all the features supported by the test service.
async fn get_features(
&self,
_request: Request<()>,
) -> GrpcResult<v1::test::TestFeatures> {
GrpcResult::Ok(tonic::Response::new(v1::test::TestFeatures {
wipe_methods: vec![
v1::test::wipe_options::WipeMethod::None as i32,
v1::test::wipe_options::WipeMethod::WriteZeroes as i32,
v1::test::wipe_options::WipeMethod::Checksum as i32,
],
cksum_algs: vec![
v1::test::wipe_options::CheckSumAlgorithm::Crc32c as i32,
],
}))
}

#[named]
async fn wipe_replica(
&self,
Expand Down Expand Up @@ -248,6 +266,11 @@ impl TryFrom<&Option<StreamWipeOptions>>
options.write_pattern.unwrap_or(0xdeadbeef),
)
}
WipeMethod::Checksum => {
crate::core::wiper::WipeMethod::CkSum(
crate::core::wiper::CkSumMethod::default(),
)
}
})
.map_err(|error| {
tonic::Status::invalid_argument(error.to_string())
Expand All @@ -271,6 +294,9 @@ impl From<&WipeStats> for WipeReplicaResponse {
wiped_chunks: value.wiped_chunks,
remaining_bytes: value.total_bytes - value.wiped_bytes,
since: value.since.and_then(|d| TryInto::try_into(d).ok()),
checksum: value
.cksum_crc32c
.map(wipe_replica_response::Checksum::Crc32),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions io-engine/tests/wipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ async fn issue_wipe_replica(
options: Some(v1_rpc::test::WipeOptions {
wipe_method: wipe_method as i32,
write_pattern: None,
cksum_alg: 0,
}),
chunk_size,
}),
Expand Down
2 changes: 1 addition & 1 deletion spdk-rs
Submodule spdk-rs updated 1 files
+1 −0 wrapper.h
Loading