diff --git a/io-engine-tests/src/fio.rs b/io-engine-tests/src/fio.rs index 41951b8c0..329aecf7e 100644 --- a/io-engine-tests/src/fio.rs +++ b/io-engine-tests/src/fio.rs @@ -1,36 +1,51 @@ use super::file_io::DataSize; -use std::sync::atomic::{AtomicU32, Ordering}; +use nix::errno::Errno; +use std::{ + path::Path, + sync::atomic::{AtomicU32, Ordering}, + time::{Duration, Instant}, +}; + +/// TODO +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum FioJobResult { + NotRun, + Ok, + Error(Errno), +} /// TODO #[derive(Debug, Clone)] #[allow(dead_code)] pub struct FioJob { /// Job counter. - counter: u32, + pub counter: u32, /// Job name. - name: String, + pub name: String, /// I/O engine to use. Default: spdk. - ioengine: String, + pub ioengine: String, /// Filename. - filename: String, + pub filename: String, /// Type of I/O pattern. - rw: String, + pub rw: String, /// If true, use non-buffered I/O (usually O_DIRECT). Default: true. - direct: bool, + pub direct: bool, /// Block size for I/O units. Default: 4k. - blocksize: Option, + pub blocksize: Option, /// Offset in the file to start I/O. Data before the offset will not be /// touched. - offset: Option, + pub offset: Option, /// Number of I/O units to keep in flight against the file. - iodepth: Option, + pub iodepth: Option, /// Number of clones (processes/threads performing the same workload) of /// this job. Default: 1. - numjobs: u32, + pub numjobs: u32, /// Terminate processing after the specified number of seconds. - runtime: Option, + pub runtime: Option, /// Total size of I/O for this job. - size: Option, + pub size: Option, + /// Run result. + pub result: FioJobResult, } impl Default for FioJob { @@ -58,6 +73,7 @@ impl FioJob { numjobs: 1, runtime: None, size: None, + result: FioJobResult::NotRun, } } @@ -100,6 +116,12 @@ impl FioJob { r } + /// Sets job name. + pub fn with_name(mut self, v: &str) -> Self { + self.name = v.to_string(); + self + } + /// I/O engine to use. Default: spdk. pub fn with_ioengine(mut self, v: &str) -> Self { self.ioengine = v.to_string(); @@ -112,6 +134,12 @@ impl FioJob { self } + /// Filename. + pub fn with_filename_path(mut self, v: impl AsRef) -> Self { + self.filename = v.as_ref().to_str().unwrap().to_string(); + self + } + /// Read-write FIO mode. pub fn with_rw(mut self, rw: &str) -> Self { self.rw = rw.to_string(); @@ -170,6 +198,10 @@ pub struct Fio { pub jobs: Vec, pub verbose: bool, pub verbose_err: bool, + pub script: String, + pub total_time: Duration, + pub exit: i32, + pub err_messages: Vec, } impl Fio { @@ -197,7 +229,7 @@ impl Fio { self } - pub fn run(&self) -> std::io::Result<()> { + pub fn run(mut self) -> Self { let cmd = "sudo -E LD_PRELOAD=$FIO_SPDK fio"; let args = self @@ -207,49 +239,110 @@ impl Fio { .collect::>() .join(" "); - let script = format!("{cmd} {args}"); + self.script = format!("{cmd} --output-format=json {args}"); - if self.verbose { - println!("{script}"); + if self.verbose || self.verbose_err { + println!("{}", self.script); } + let start_time = Instant::now(); let (exit, stdout, stderr) = run_script::run( - &script, + &self.script, &Vec::new(), &run_script::ScriptOptions::new(), ) .unwrap(); - if exit == 0 { - if self.verbose { - println!("FIO:"); - println!("{script}"); - println!("Output:"); - println!("{stdout}"); - } - - Ok(()) - } else { - if self.verbose_err { - println!("Error running FIO:"); - println!("{script}"); - println!("Exit code: {exit}"); - println!("Output:"); - println!("{stdout}"); - println!("Error output:"); - println!("{stderr}"); - } - - Err(std::io::Error::new( - std::io::ErrorKind::Other, - format!("SPDK FIO error: {stderr}"), - )) + self.total_time = start_time.elapsed(); + self.push_err(&stderr); + self.exit = exit; + + if let Err(e) = self.update_result(&stdout) { + self.push_err(&e); + } + + if self.verbose_err { + println!( + "Error(s) running FIO: {s}", + s = self.err_messages.join("\n") + ); + } + + self + } + + /// TODO + fn push_err(&mut self, msg: &str) { + let s = msg.trim_end_matches('\n'); + if !s.is_empty() { + self.err_messages.push(s.to_string()); } } + + /// TODO + fn update_result(&mut self, out: &str) -> Result<(), String> { + // Filter out lines error messages, those starting with "fio: ". + let out = out + .split('\n') + .filter(|s| !s.starts_with("fio: ")) + .collect::>() + .join("\n"); + + serde_json::from_str::(&out) + .map_err(|e| e.to_string())? + .get("jobs") + .ok_or_else(|| "No 'jobs' item in output".to_string())? + .as_array() + .ok_or_else(|| "'jobs' item in output is not an array".to_string())? + .iter() + .for_each(|j| { + let name = + j.get("jobname").unwrap().as_str().unwrap().to_string(); + let err = j.get("error").unwrap().as_i64().unwrap() as i32; + + if let Some(j) = self.find_job_mut(&name) { + if err == 0 { + j.result = FioJobResult::Ok; + } else { + j.result = FioJobResult::Error(Errno::from_i32(err)); + } + } + }); + + Ok(()) + } + + /// TODO + pub fn find_job(&self, name: &str) -> Option<&FioJob> { + self.jobs.iter().find(|j| j.name == name) + } + + /// TODO + pub fn find_job_mut(&mut self, name: &str) -> Option<&mut FioJob> { + self.jobs.iter_mut().find(|j| j.name == name) + } } -/// TODO -pub async fn run_fio_jobs(fio: &Fio) -> std::io::Result<()> { - let fio = fio.clone(); - tokio::spawn(async move { fio.run() }).await.unwrap() +/// Spawns a tokio task and runs the given FIO on it. Any FIO error is converted +/// into an `std::io::Result`. +pub async fn spawn_fio_task(fio: &Fio) -> std::io::Result<()> { + let fio = tokio::spawn({ + let fio = fio.clone(); + async move { fio.run() } + }) + .await + .unwrap(); + + if fio.exit == 0 { + Ok(()) + } else { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!( + "SPDK FIO error: {exit} {err_msg}", + exit = fio.exit, + err_msg = fio.err_messages.join("\n") + ), + )) + } } diff --git a/io-engine-tests/src/nexus.rs b/io-engine-tests/src/nexus.rs index 5f06d0ea8..c2bcb2eda 100644 --- a/io-engine-tests/src/nexus.rs +++ b/io-engine-tests/src/nexus.rs @@ -11,6 +11,8 @@ use super::{ DestroyNexusRequest, ListNexusOptions, Nexus, + NexusNvmePreemption, + NvmeReservation, PublishNexusRequest, RebuildHistoryRecord, RebuildHistoryRequest, @@ -46,6 +48,7 @@ pub struct NexusBuilder { resv_key: u64, preempt_key: u64, resv_type: Option, + preempt_policy: i32, children: Option>, nexus_info_key: Option, serial: Option, @@ -63,6 +66,7 @@ impl NexusBuilder { resv_key: 1, preempt_key: 0, resv_type: None, + preempt_policy: 0, children: None, nexus_info_key: None, serial: None, @@ -120,6 +124,21 @@ impl NexusBuilder { self.with_bdev(&r.bdev()) } + pub fn with_resv_key(mut self, r: u64) -> Self { + self.resv_key = r; + self + } + + pub fn with_resv_type(mut self, r: NvmeReservation) -> Self { + self.resv_type = Some(r as i32); + self + } + + pub fn with_preempt_policy(mut self, r: NexusNvmePreemption) -> Self { + self.preempt_policy = r as i32; + self + } + fn replica_uri(&self, r: &ReplicaBuilder) -> String { if r.rpc() == self.rpc() { r.bdev() @@ -174,7 +193,7 @@ impl NexusBuilder { children: self.children.as_ref().unwrap().clone(), nexus_info_key: self.nexus_info_key.as_ref().unwrap().clone(), resv_type: self.resv_type, - preempt_policy: 0, + preempt_policy: self.preempt_policy, }) .await .map(|r| r.into_inner().nexus.unwrap()) diff --git a/io-engine-tests/src/nvmf.rs b/io-engine-tests/src/nvmf.rs index 074a53720..6e1f90ac6 100644 --- a/io-engine-tests/src/nvmf.rs +++ b/io-engine-tests/src/nvmf.rs @@ -2,7 +2,7 @@ use std::{net::SocketAddr, path::PathBuf}; use super::{ file_io::{compare_files, test_write_to_file, DataSize}, - fio::{run_fio_jobs, Fio}, + fio::{spawn_fio_task, Fio}, nexus::{make_nexus_nqn, make_nexus_serial}, nvme::{find_mayastor_nvme_device_path, NmveConnectGuard}, }; @@ -89,7 +89,7 @@ pub async fn test_fio_to_nvmf( }) .collect(); - run_fio_jobs(&fio).await + spawn_fio_task(&fio).await } /// TODO @@ -111,5 +111,5 @@ pub async fn test_fio_to_nvmf_aio( }) .collect(); - run_fio_jobs(&fio).await + spawn_fio_task(&fio).await } diff --git a/io-engine/src/bdev/nexus/nexus_bdev.rs b/io-engine/src/bdev/nexus/nexus_bdev.rs index b483e3eaa..57e63f576 100644 --- a/io-engine/src/bdev/nexus/nexus_bdev.rs +++ b/io-engine/src/bdev/nexus/nexus_bdev.rs @@ -61,6 +61,7 @@ use crate::{ subsys::NvmfSubsystem, }; +use crate::core::IoCompletionStatus; use events_api::event::EventAction; use spdk_rs::{ BdevIo, @@ -261,6 +262,8 @@ pub struct Nexus<'n> { pub(super) nexus_target: Option, /// Indicates if the Nexus has an I/O device. pub(super) has_io_device: bool, + /// Initiators. + initiators: parking_lot::Mutex>, /// Information associated with the persisted NexusInfo structure. pub(super) nexus_info: futures::lock::Mutex, /// Nexus I/O subsystem. @@ -271,10 +274,10 @@ pub struct Nexus<'n> { pub(super) rebuild_history: parking_lot::Mutex>, /// Flag to control shutdown from I/O path. pub(crate) shutdown_requested: AtomicCell, + /// Last child I/O error. + pub(super) last_error: IoCompletionStatus, /// Prevent auto-Unpin. _pin: PhantomPinned, - /// Initiators. - initiators: parking_lot::Mutex>, } impl<'n> Debug for Nexus<'n> { @@ -379,6 +382,7 @@ impl<'n> Nexus<'n> { event_sink: None, rebuild_history: parking_lot::Mutex::new(Vec::new()), shutdown_requested: AtomicCell::new(false), + last_error: IoCompletionStatus::Success, _pin: Default::default(), }; diff --git a/io-engine/src/bdev/nexus/nexus_io.rs b/io-engine/src/bdev/nexus/nexus_io.rs index 837256e52..8caaa9c9a 100644 --- a/io-engine/src/bdev/nexus/nexus_io.rs +++ b/io-engine/src/bdev/nexus/nexus_io.rs @@ -1,6 +1,7 @@ use std::{ fmt::{Debug, Formatter}, ops::{Deref, DerefMut}, + pin::Pin, }; use libc::c_void; @@ -9,8 +10,10 @@ use nix::errno::Errno; use spdk_rs::{ libspdk::{ spdk_bdev_io, + spdk_bdev_io_complete_nvme_status, spdk_io_channel, SPDK_NVME_SC_ABORTED_SQ_DELETION, + SPDK_NVME_SC_CAPACITY_EXCEEDED, SPDK_NVME_SC_INVALID_OPCODE, SPDK_NVME_SC_RESERVATION_CONFLICT, }, @@ -207,11 +210,19 @@ impl<'n> NexusBio<'n> { } } - /// Obtains the Nexus struct embedded within the bdev. + /// Obtains a reference to the Nexus struct embedded within the bdev. + #[inline(always)] pub(crate) fn nexus(&self) -> &Nexus<'n> { self.bdev_checked(NEXUS_PRODUCT_ID).data() } + /// Obtains a mutable reference to the Nexus struct embedded within the + /// bdev. + #[inline(always)] + fn nexus_mut(&mut self) -> Pin<&mut Nexus<'n>> { + self.bdev_checked(NEXUS_PRODUCT_ID).data_mut() + } + /// Invoked when a nexus IO completes. fn child_completion( device: &dyn BlockDevice, @@ -234,7 +245,7 @@ impl<'n> NexusBio<'n> { self.driver_ctx_mut::() } - /// completion handler for the nexus when a child IO completes + /// Completion handler for the nexus when a child I/O completes. fn complete( &mut self, child: &dyn BlockDevice, @@ -270,10 +281,37 @@ impl<'n> NexusBio<'n> { self.resubmit(); } else { error!("{self:?}: failing nexus I/O: all child I/Os failed"); + + unsafe { + self.nexus_mut().get_unchecked_mut().last_error = status; + } + self.fail(); } } + /// Fails the current I/O with a generic internal error. If the nexus + /// already had a last child error, it fails with it. + fn fail(&self) { + match self.nexus().last_error { + IoCompletionStatus::NvmeError(s) => self.fail_nvme_status(s), + IoCompletionStatus::LvolError(LvolFailure::NoSpace) => self + .fail_nvme_status(NvmeStatus::Generic( + SPDK_NVME_SC_CAPACITY_EXCEEDED, + )), + _ => self.0.fail(), + } + } + + /// Completes the I/O with the given `NvmeStatus`. + #[inline(always)] + fn fail_nvme_status(&self, status: NvmeStatus) { + let (sct, sc) = status.as_sct_sc_codes(); + unsafe { + spdk_bdev_io_complete_nvme_status(self.as_ptr(), 0, sct, sc); + } + } + /// Resubmits the I/O. fn resubmit(&mut self) { warn!("{self:?}: resubmitting nexus I/O due to a child I/O failure"); diff --git a/io-engine/src/core/env.rs b/io-engine/src/core/env.rs index 82f4184a1..0dc1207f0 100644 --- a/io-engine/src/core/env.rs +++ b/io-engine/src/core/env.rs @@ -4,6 +4,7 @@ use std::{ net::Ipv4Addr, os::raw::{c_char, c_void}, pin::Pin, + str::FromStr, sync::{ atomic::{AtomicBool, Ordering::SeqCst}, Arc, @@ -54,6 +55,7 @@ use crate::{ persistent_store::PersistentStoreBuilder, subsys::{ self, + config::opts::TARGET_CRDT_LEN, registration::registration_grpc::ApiVersion, Config, PoolConfig, @@ -87,6 +89,30 @@ fn parse_ps_timeout(src: &str) -> Result { .map(|d| d.clamp(Duration::from_secs(1), Duration::from_secs(60))) } +/// Parses Command Retry Delay(s): either a single integer or a comma-separated +/// list of three integers. +fn parse_crdt(src: &str) -> Result<[u16; TARGET_CRDT_LEN], String> { + fn parse_val(s: &str) -> Result { + let u = u16::from_str(s).map_err(|e| e.to_string())?; + if u > 100 { + Err("Command Retry Delay value is too big".to_string()) + } else { + Ok(u) + } + } + + let items = src.split(',').collect::>(); + match items.as_slice() { + [one] => Ok([parse_val(one)?, 0, 0]), + [one, two, three] => { + Ok([parse_val(one)?, parse_val(two)?, parse_val(three)?]) + } + _ => Err("Command Retry Delay argument must be an integer or \ + a comma-separated list of three intergers" + .to_string()), + } +} + #[derive(Debug, Clone, Parser)] #[clap( name = package_description!(), @@ -163,9 +189,18 @@ pub struct MayastorCliArgs { #[clap(short = 'T', long = "tgt-iface", env = "NVMF_TGT_IFACE")] /// NVMF target interface (ip, mac, name or subnet). pub nvmf_tgt_interface: Option, - /// NVMF target Command Retry Delay. - #[clap(long = "tgt-crdt", env = "NVMF_TGT_CRDT", default_value = "0")] - pub nvmf_tgt_crdt: u16, + /// NVMF target Command Retry Delay in x100 ms (single integer or three + /// comma-separated integers). First value is used for errors on nexus + /// target except reservation conflict and no space; second + /// value is used for reservation conflict and no space on nexus target; + /// third value is used for all errors on replica target. + #[clap( + long = "tgt-crdt", + env = "NVMF_TGT_CRDT", + default_value = "0", + value_parser = parse_crdt, + )] + pub nvmf_tgt_crdt: [u16; TARGET_CRDT_LEN], /// The gRPC api version. #[clap( long, @@ -242,7 +277,7 @@ impl Default for MayastorCliArgs { nvme_ctl_io_ctx_pool_size: 65535, registration_endpoint: None, nvmf_tgt_interface: None, - nvmf_tgt_crdt: 0, + nvmf_tgt_crdt: [0; TARGET_CRDT_LEN], api_versions: vec![ApiVersion::V0, ApiVersion::V1], diagnose_stack: None, reactor_freeze_detection: false, @@ -344,7 +379,8 @@ pub struct MayastorEnvironment { bdev_io_ctx_pool_size: u64, nvme_ctl_io_ctx_pool_size: u64, nvmf_tgt_interface: Option, - pub nvmf_tgt_crdt: u16, + /// NVMF target Command Retry Delay in x100 ms. + pub nvmf_tgt_crdt: [u16; TARGET_CRDT_LEN], api_versions: Vec, skip_sig_handler: bool, enable_io_all_thrd_nexus_channels: bool, @@ -391,7 +427,7 @@ impl Default for MayastorEnvironment { bdev_io_ctx_pool_size: 65535, nvme_ctl_io_ctx_pool_size: 65535, nvmf_tgt_interface: None, - nvmf_tgt_crdt: 0, + nvmf_tgt_crdt: [0; TARGET_CRDT_LEN], api_versions: vec![ApiVersion::V0, ApiVersion::V1], skip_sig_handler: false, enable_io_all_thrd_nexus_channels: false, diff --git a/io-engine/src/core/mod.rs b/io-engine/src/core/mod.rs index 38fca91f9..92994d388 100644 --- a/io-engine/src/core/mod.rs +++ b/io-engine/src/core/mod.rs @@ -78,6 +78,8 @@ pub use snapshot::{ SnapshotXattrs, }; +use spdk_rs::libspdk::SPDK_NVME_SC_CAPACITY_EXCEEDED; + mod bdev; mod block_device; mod descriptor; @@ -496,10 +498,13 @@ impl Debug for IoCompletionStatus { impl From for IoCompletionStatus { fn from(s: NvmeStatus) -> Self { - if s == NvmeStatus::VendorSpecific(libc::ENOSPC) { - IoCompletionStatus::LvolError(LvolFailure::NoSpace) - } else { - IoCompletionStatus::NvmeError(s) + match s { + NvmeStatus::NO_SPACE + | NvmeStatus::Generic(SPDK_NVME_SC_CAPACITY_EXCEEDED) => { + IoCompletionStatus::LvolError(LvolFailure::NoSpace) + } + + _ => IoCompletionStatus::NvmeError(s), } } } diff --git a/io-engine/src/subsys/config/opts.rs b/io-engine/src/subsys/config/opts.rs index 68d4eca3a..23af1aa37 100644 --- a/io-engine/src/subsys/config/opts.rs +++ b/io-engine/src/subsys/config/opts.rs @@ -76,6 +76,10 @@ impl GetOpts for NexusOpts { } } +/// Length of target Command Retry Delay configuration array. +/// Must be equal to the size of `spdk_nvmf_target_opts.crdt`. +pub const TARGET_CRDT_LEN: usize = 3; + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(default, deny_unknown_fields)] pub struct NvmfTgtConfig { @@ -83,8 +87,8 @@ pub struct NvmfTgtConfig { pub name: String, /// the max number of namespaces this target should allow for pub max_namespaces: u32, - /// Command Retry Delay. - pub crdt: u16, + /// NVMF target Command Retry Delay in x100 ms. + pub crdt: [u16; TARGET_CRDT_LEN], /// TCP transport options pub opts: NvmfTcpTransportOpts, } @@ -94,7 +98,7 @@ impl From for Box { let mut out = Self::default(); copy_str_with_null(&o.name, &mut out.name); out.max_subsystems = o.max_namespaces; - out.crdt[0] = o.crdt; + out.crdt = o.crdt; out } } diff --git a/io-engine/src/subsys/mod.rs b/io-engine/src/subsys/mod.rs index dc512b5be..0518e9c0d 100644 --- a/io-engine/src/subsys/mod.rs +++ b/io-engine/src/subsys/mod.rs @@ -30,7 +30,7 @@ pub use registration::{ use crate::subsys::nvmf::Nvmf; -mod config; +pub(super) mod config; mod nvmf; /// Module for registration of the data-plane with control-plane pub mod registration; diff --git a/io-engine/src/subsys/nvmf/subsystem.rs b/io-engine/src/subsys/nvmf/subsystem.rs index 58ca2ac43..1899cb30d 100644 --- a/io-engine/src/subsys/nvmf/subsystem.rs +++ b/io-engine/src/subsys/nvmf/subsystem.rs @@ -14,8 +14,10 @@ use spdk_rs::{ nvmf_subsystem_set_ana_state, nvmf_subsystem_set_cntlid_range, spdk_bdev_nvme_opts, + spdk_nvmf_ctrlr_set_cpl_error_cb, spdk_nvmf_ns_get_bdev, spdk_nvmf_ns_opts, + spdk_nvmf_request, spdk_nvmf_subsystem, spdk_nvmf_subsystem_add_host, spdk_nvmf_subsystem_add_listener, @@ -46,9 +48,14 @@ use spdk_rs::{ spdk_nvmf_subsystem_state_change_done, spdk_nvmf_subsystem_stop, spdk_nvmf_tgt, + SPDK_NVME_SCT_GENERIC, + SPDK_NVME_SC_CAPACITY_EXCEEDED, + SPDK_NVME_SC_RESERVATION_CONFLICT, SPDK_NVMF_SUBTYPE_DISCOVERY, SPDK_NVMF_SUBTYPE_NVME, }, + NvmeStatus, + NvmfController, NvmfSubsystemEvent, }; @@ -64,6 +71,7 @@ use crate::{ }, }; +/// TODO #[derive(Debug, PartialOrd, PartialEq)] pub enum SubType { Nvme, @@ -235,20 +243,41 @@ impl NvmfSubsystem { let nexus = nexus_lookup_nqn(&nqn); let event = NvmfSubsystemEvent::from_cb_args(event, ctx); - debug!("NVMF subsystem event '{nqn}': {event:?}"); + debug!("NVMF subsystem event {subsystem:?}: {event:?}"); match event { NvmfSubsystemEvent::HostConnect(ctrlr) => { + info!( + "Subsystem '{nqn}': host connected: '{host}'", + host = ctrlr.hostnqn() + ); + if let Some(nex) = nexus { nex.add_initiator(&ctrlr.hostnqn()); + subsystem.host_connect_nexus(ctrlr); + } else { + subsystem.host_connect_replica(ctrlr); } } NvmfSubsystemEvent::HostDisconnect(ctrlr) => { + info!( + "Subsystem '{nqn}': host disconnected: '{host}'", + host = ctrlr.hostnqn() + ); + if let Some(nex) = nexus { nex.rm_initiator(&ctrlr.hostnqn()); + subsystem.host_disconnect_nexus(ctrlr); + } else { + subsystem.host_disconnect_replica(ctrlr); } } NvmfSubsystemEvent::HostKeepAliveTimeout(ctrlr) => { + warn!( + "Subsystem '{nqn}': host keep alive timeout: '{host}'", + host = ctrlr.hostnqn() + ); + if let Some(nex) = nexus { nex.initiator_keep_alive_timeout(&ctrlr.hostnqn()); } @@ -259,6 +288,99 @@ impl NvmfSubsystem { } } + /// Completion error callback for nexuses. + unsafe extern "C" fn nexus_cpl_error_cb( + req: *mut spdk_nvmf_request, + _cb_arg: *mut ::std::os::raw::c_void, + ) { + let req = &mut *req; + let cpl = req.nvme_cpl_mut(); + let mut status = cpl.status(); + + if status.crd() == 0 { + return; + } + + // Use CRD #2 for certain errors. + match status.status() { + NvmeStatus::Generic(SPDK_NVME_SC_RESERVATION_CONFLICT) + | NvmeStatus::Generic(SPDK_NVME_SC_CAPACITY_EXCEEDED) => { + status.set_crd(2); + } + _ => {} + } + + cpl.set_status(status); + } + + /// Called upon a host connection to a nexus. + fn host_connect_nexus(&self, ctrlr: NvmfController) { + unsafe { + spdk_nvmf_ctrlr_set_cpl_error_cb( + ctrlr.0.as_ptr(), + Some(Self::nexus_cpl_error_cb), + std::ptr::null_mut(), + ); + } + } + + /// Called upon a host disconnection from a nexus. + fn host_disconnect_nexus(&self, ctrlr: NvmfController) { + unsafe { + spdk_nvmf_ctrlr_set_cpl_error_cb( + ctrlr.0.as_ptr(), + None, + std::ptr::null_mut(), + ); + } + } + + /// Completion error callback for replicas. + unsafe extern "C" fn replica_cpl_error_cb( + req: *mut spdk_nvmf_request, + _cb_arg: *mut ::std::os::raw::c_void, + ) { + let req = &mut *req; + let cpl = req.nvme_cpl_mut(); + + let mut status = cpl.status(); + + // Change CRD for replica to 3. + if status.crd() == 1 { + status.set_crd(3); + } + + // Correct vendor-specific ENOSPC error. + if status.status().is_no_space() { + status.set_sct(SPDK_NVME_SCT_GENERIC as u16); + status.set_sc(SPDK_NVME_SC_CAPACITY_EXCEEDED as u16); + } + + cpl.set_status(status); + } + + /// Called upon a host connection to a replica. + fn host_connect_replica(&self, ctrlr: NvmfController) { + unsafe { + spdk_nvmf_ctrlr_set_cpl_error_cb( + ctrlr.0.as_ptr(), + Some(Self::replica_cpl_error_cb), + std::ptr::null_mut(), + ); + } + } + + /// Called upon a host disconnection from a replica. + fn host_disconnect_replica(&self, ctrlr: NvmfController) { + unsafe { + spdk_nvmf_ctrlr_set_cpl_error_cb( + ctrlr.0.as_ptr(), + None, + std::ptr::null_mut(), + ); + } + } + /// create a new subsystem where the NQN is based on the UUID pub fn new(uuid: &str) -> Result { let nqn = make_nqn(uuid).into_cstring(); diff --git a/io-engine/tests/nexus_crd.rs b/io-engine/tests/nexus_crd.rs new file mode 100644 index 000000000..6228bb847 --- /dev/null +++ b/io-engine/tests/nexus_crd.rs @@ -0,0 +1,411 @@ +#![cfg(feature = "fault-injection")] + +pub mod common; + +use std::{sync::Arc, time::Duration}; +use tokio::sync::{ + oneshot, + oneshot::{Receiver, Sender}, +}; + +use common::{ + compose::{ + rpc::v1::{ + nexus::{NexusNvmePreemption, NvmeReservation}, + GrpcConnect, + SharedRpcHandle, + }, + Binary, + Builder, + }, + file_io::DataSize, + fio::{spawn_fio_task, Fio, FioJob, FioJobResult}, + nexus::NexusBuilder, + nvme::{find_mayastor_nvme_device_path, NmveConnectGuard}, + nvmf::NvmfLocation, + pool::PoolBuilder, + replica::ReplicaBuilder, + test::{add_fault_injection, remove_fault_injection}, +}; + +use io_engine::core::fault_injection::{ + FaultDomain, + FaultIoOperation, + InjectionBuilder, +}; + +const POOL_SIZE: u64 = 500; +const REPL_SIZE: u64 = 450; +const REPL_UUID: &str = "65acdaac-14c4-41d8-a55e-d03bfd7185a4"; +const NEXUS_NAME: &str = "nexus_0"; +const NEXUS_SIZE: u64 = REPL_SIZE; +const NEXUS_UUID: &str = "bbe6cbb6-c508-443a-877a-af5fa690c760"; + +/// Tests that without CRD enabled, initiator would eventually fail I/Os. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn nexus_fail_no_crd() { + test_nexus_fail("0") + .await + .expect_err("I/O expected to fail"); +} + +/// Tests that CRD properly delays I/O retries on initiator, while the target +/// has a chance to replace a failed nexus. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn nexus_fail_crd() { + test_nexus_fail("20") + .await + .expect("I/O expected to succeed"); +} + +async fn test_nexus_fail(crdt: &str) -> std::io::Result<()> { + common::composer_init(); + + let test = Builder::new() + .name("cargo-test") + .network("10.1.0.0/16") + .unwrap() + .add_container_bin( + "ms_0", + Binary::from_dbg("io-engine").with_args(vec!["-l", "5"]), + ) + .add_container_bin( + "ms_nex", + Binary::from_dbg("io-engine").with_args(vec![ + "-l", + "1,2,3,4", + "--tgt-crdt", + crdt, + ]), + ) + .with_clean(true) + .build() + .await + .unwrap(); + + let test = Arc::new(test); + + let conn = GrpcConnect::new(&test); + + let ms_0 = conn.grpc_handle_shared("ms_0").await.unwrap(); + let ms_nex = conn.grpc_handle_shared("ms_nex").await.unwrap(); + + let mut pool_0 = PoolBuilder::new(ms_0.clone()) + .with_name("pool0") + .with_new_uuid() + .with_malloc("mem0", POOL_SIZE); + + let mut repl_0 = ReplicaBuilder::new(ms_0.clone()) + .with_pool(&pool_0) + .with_name("r0") + .with_new_uuid() + .with_size_mb(REPL_SIZE) + .with_thin(false); + + pool_0.create().await.unwrap(); + repl_0.create().await.unwrap(); + repl_0.share().await.unwrap(); + + let mut nex_0 = NexusBuilder::new(ms_nex.clone()) + .with_name(NEXUS_NAME) + .with_uuid(NEXUS_UUID) + .with_size_mb(NEXUS_SIZE) + .with_replica(&repl_0); + + nex_0.create().await.unwrap(); + nex_0.publish().await.unwrap(); + + let children = nex_0.get_nexus().await.unwrap().children; + let dev_name = children[0].device_name.as_ref().unwrap(); + + let inj_w = InjectionBuilder::default() + .with_device_name(dev_name.clone()) + .with_domain(FaultDomain::NexusChild) + .with_io_operation(FaultIoOperation::Write) + .build_uri() + .unwrap(); + + let inj_r = InjectionBuilder::default() + .with_device_name(dev_name.clone()) + .with_domain(FaultDomain::NexusChild) + .with_io_operation(FaultIoOperation::Read) + .build_uri() + .unwrap(); + + let cfg = NexusManageTask { + ms_nex: ms_nex.clone(), + nex_0: nex_0.clone(), + repl_0: repl_0.clone(), + inj_w, + inj_r, + }; + + // Run two tasks in parallel, I/O and nexus management. + let (s, r) = oneshot::channel(); + + let j0 = tokio::spawn({ + let nvmf = nex_0.nvmf_location(); + async move { run_io_task(s, &nvmf, 10, 20).await } + }); + tokio::pin!(j0); + + let j1 = tokio::spawn({ + let cfg = cfg.clone(); + async move { + run_nexus_manage_task(r, cfg).await; + } + }); + tokio::pin!(j1); + + let (io_res, _) = tokio::join!(j0, j1); + io_res.unwrap() +} + +#[derive(Clone)] +struct NexusManageTask { + ms_nex: SharedRpcHandle, + nex_0: NexusBuilder, + repl_0: ReplicaBuilder, + inj_w: String, + inj_r: String, +} + +/// Runs multiple FIO I/O jobs. +async fn run_io_task( + s: Sender<()>, + nvmf: &NvmfLocation, + cnt: u32, + rt: u32, +) -> std::io::Result<()> { + let _cg = NmveConnectGuard::connect_addr(&nvmf.addr, &nvmf.nqn); + let path = find_mayastor_nvme_device_path(&nvmf.serial) + .unwrap() + .to_str() + .unwrap() + .to_string(); + + let jobs = (0 .. cnt).map(|_| { + FioJob::new() + .with_direct(true) + .with_ioengine("libaio") + .with_iodepth(128) + .with_filename(&path) + .with_runtime(rt) + .with_rw("randwrite") + }); + + let fio = Fio::new().with_jobs(jobs); + + // Notify the nexus management task that connection is complete and I/O + // starts. + s.send(()).unwrap(); + + // Start FIO. + spawn_fio_task(&fio).await +} + +/// Manages the nexus in parallel to I/O task. +/// [1] Nexus is failed by injecting a fault. +/// [2] I/O running in parallel should freeze or fail, depending on how target's +/// configured. +/// [3] Nexus is recreated. +async fn run_nexus_manage_task(r: Receiver<()>, cfg: NexusManageTask) { + let NexusManageTask { + ms_nex, + inj_w, + inj_r, + mut nex_0, + repl_0, + .. + } = cfg; + + // Wait until I/O task connects and signals it is ready. + r.await.unwrap(); + + // Allow I/O to run for some time. + tokio::time::sleep(Duration::from_secs(2)).await; + + // Inject fault, so the nexus would fail. + add_fault_injection(ms_nex.clone(), &inj_w).await.unwrap(); + add_fault_injection(ms_nex.clone(), &inj_r).await.unwrap(); + + // When nexus fails, I/O should be freezing due to CRD (if enabled). + tokio::time::sleep(Duration::from_secs(2)).await; + + // Destroy the nexus, remove injectios and re-create and re-publish the + // nexus with the same ID. + // I/O would eventually retry and the new nexus would run I/O. + nex_0.shutdown().await.unwrap(); + nex_0.destroy().await.unwrap(); + + remove_fault_injection(ms_nex.clone(), &inj_w) + .await + .unwrap(); + remove_fault_injection(ms_nex.clone(), &inj_r) + .await + .unwrap(); + + let mut nex_0 = NexusBuilder::new(ms_nex.clone()) + .with_name(NEXUS_NAME) + .with_uuid(NEXUS_UUID) + .with_size_mb(NEXUS_SIZE) + .with_replica(&repl_0); + + nex_0.create().await.unwrap(); + nex_0.publish().await.unwrap(); +} + +#[tokio::test] +async fn nexus_crd_resv() { + common::composer_init(); + + const HOSTID_0: &str = "53b35ce9-8e71-49a9-ab9b-cba7c5670fad"; + const HOSTID_1: &str = "c1affd2d-ef79-4ba4-b5cf-8eb48f9c07d0"; + const HOSTID_2: &str = "3f264cc3-1c95-44ca-bc1f-ed7fb68f3894"; + const PTPL_CONTAINER_DIR: &str = "/host/tmp/ptpl"; + const RESV_KEY_1: u64 = 0xabcd_ef00_1234_5678; + const RESV_KEY_2: u64 = 0xfeed_f00d_bead_5678; + + // Set 1st, 3nd CRD to non-zero value and 2nd to zero. + // Nexus reservation must select the second one (zero). + const CRDT: &str = "0,15,0"; + const TOTAL_DELAY: u64 = 15 * 5 * 100; + + let ptpl_dir = |ms| format!("{PTPL_CONTAINER_DIR}/{ms}"); + + let test = Builder::new() + .name("nexus_crd_resv_test") + .network("10.1.0.0/16") + .unwrap() + .add_container_bin( + "ms_0", + Binary::from_dbg("io-engine") + .with_env("NEXUS_NVMF_RESV_ENABLE", "1") + .with_env("MAYASTOR_NVMF_HOSTID", HOSTID_0) + .with_args(vec![ + "-l", + "1", + "-F", + "compact,color,host", + "--tgt-crdt", + CRDT, + "--ptpl-dir", + ptpl_dir("ms_0").as_str(), + ]) + .with_bind("/tmp", "/host/tmp"), + ) + .add_container_bin( + "ms_1", + Binary::from_dbg("io-engine") + .with_env("NEXUS_NVMF_RESV_ENABLE", "1") + .with_env("MAYASTOR_NVMF_HOSTID", HOSTID_1) + .with_args(vec![ + "-l", + "2", + "-F", + "compact,color,host", + "--tgt-crdt", + CRDT, + "--ptpl-dir", + ptpl_dir("ms_1").as_str(), + ]) + .with_bind("/tmp", "/host/tmp"), + ) + .add_container_bin( + "ms_2", + Binary::from_dbg("io-engine") + .with_env("NEXUS_NVMF_RESV_ENABLE", "1") + .with_env("MAYASTOR_NVMF_HOSTID", HOSTID_2) + .with_args(vec![ + "-l", + "3", + "-F", + "compact,color,host", + "--tgt-crdt", + CRDT, + "--ptpl-dir", + ptpl_dir("ms_2").as_str(), + ]), + ) + .with_clean(true) + .build() + .await + .unwrap(); + + let conn = GrpcConnect::new(&test); + + let ms_0 = conn.grpc_handle_shared("ms_0").await.unwrap(); + let ms_1 = conn.grpc_handle_shared("ms_1").await.unwrap(); + let ms_2 = conn.grpc_handle_shared("ms_2").await.unwrap(); + + let mut pool_0 = PoolBuilder::new(ms_0.clone()) + .with_name("pool0") + .with_new_uuid() + .with_malloc("mem0", POOL_SIZE); + + pool_0.create().await.unwrap(); + + let mut repl_0 = ReplicaBuilder::new(ms_0.clone()) + .with_pool(&pool_0) + .with_name("r0") + .with_uuid(REPL_UUID) + .with_size_mb(REPL_SIZE) + .with_thin(false); + + repl_0.create().await.unwrap(); + repl_0.share().await.unwrap(); + + // Create nexus #1. + let mut nex_1 = NexusBuilder::new(ms_1.clone()) + .with_name(NEXUS_NAME) + .with_uuid(NEXUS_UUID) + .with_size_mb(NEXUS_SIZE) + .with_replica(&repl_0) + .with_resv_key(RESV_KEY_1) + .with_resv_type(NvmeReservation::ExclusiveAccess) + .with_preempt_policy(NexusNvmePreemption::Holder); + + nex_1.create().await.unwrap(); + nex_1.publish().await.unwrap(); + + // Create nexus #2. + let mut nex_2 = NexusBuilder::new(ms_2.clone()) + .with_name(NEXUS_NAME) + .with_uuid(NEXUS_UUID) + .with_size_mb(NEXUS_SIZE) + .with_replica(&repl_0) + .with_resv_key(RESV_KEY_2) + .with_resv_type(NvmeReservation::ExclusiveAccess) + .with_preempt_policy(NexusNvmePreemption::Holder); + + nex_2.create().await.unwrap(); + nex_2.publish().await.unwrap(); + + // Run I/O on the first nexus, causing SPDK_NVME_SC_RESERVATION_CONFLICT. + // io-engine must select 2nd CRD, which is configured to be zero. + let fio_res = { + let (_cg, path) = nex_1.nvmf_location().open().unwrap(); + + let fio = Fio::new().with_job( + FioJob::new() + .with_name("j0") + .with_filename_path(path) + .with_ioengine("libaio") + .with_iodepth(1) + .with_direct(true) + .with_rw("write") + .with_size(DataSize::from_kb(4)), + ); + + tokio::spawn(async { fio.run() }).await.unwrap() + }; + assert!(fio_res.total_time < Duration::from_millis(TOTAL_DELAY)); + + // The required errno (EBADE) exists on Linux-like targets only. On other + // platforms like macos, an IDE would highlight it as an error. + #[cfg(target_os = "linux")] + assert_eq!( + fio_res.find_job("j0").unwrap().result, + FioJobResult::Error(nix::errno::Errno::EBADE) + ); +} diff --git a/io-engine/tests/nexus_fail_crd.rs b/io-engine/tests/nexus_fail_crd.rs deleted file mode 100644 index ccb8ff219..000000000 --- a/io-engine/tests/nexus_fail_crd.rs +++ /dev/null @@ -1,229 +0,0 @@ -pub mod common; - -use std::{sync::Arc, time::Duration}; -use tokio::sync::{ - oneshot, - oneshot::{Receiver, Sender}, -}; - -use common::{ - compose::{ - rpc::v1::{GrpcConnect, SharedRpcHandle}, - Binary, - Builder, - }, - fio::{Fio, FioJob}, - nexus::NexusBuilder, - nvme::{find_mayastor_nvme_device_path, NmveConnectGuard}, - pool::PoolBuilder, - replica::ReplicaBuilder, - test::{add_fault_injection, remove_fault_injection}, -}; - -const POOL_SIZE: u64 = 500; -const REPL_SIZE: u64 = 450; -const NEXUS_NAME: &str = "nexus_0"; -const NEXUS_SIZE: u64 = REPL_SIZE; -const NEXUS_UUID: &str = "bbe6cbb6-c508-443a-877a-af5fa690c760"; - -/// Tests that without CRD enabled, initiator would eventually fail I/Os. -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -async fn nexus_fail_no_crd() { - test_nexus_fail("0") - .await - .expect_err("I/O expected to fail"); -} - -/// Tests that CRD properly delays I/O retries on initiator, while the target -/// has a chance to replace a failed nexus. -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -async fn nexus_fail_crd() { - test_nexus_fail("20") - .await - .expect("I/O expected to succeed"); -} - -async fn test_nexus_fail(crdt: &str) -> std::io::Result<()> { - common::composer_init(); - - let test = Builder::new() - .name("cargo-test") - .network("10.1.0.0/16") - .unwrap() - .add_container_bin( - "ms_0", - Binary::from_dbg("io-engine").with_args(vec!["-l", "5"]), - ) - .add_container_bin( - "ms_nex", - Binary::from_dbg("io-engine").with_args(vec![ - "-l", - "1,2,3,4", - "--tgt-crdt", - crdt, - ]), - ) - .with_clean(true) - .build() - .await - .unwrap(); - - let test = Arc::new(test); - - let conn = GrpcConnect::new(&test); - - let ms_0 = conn.grpc_handle_shared("ms_0").await.unwrap(); - let ms_nex = conn.grpc_handle_shared("ms_nex").await.unwrap(); - - let mut pool_0 = PoolBuilder::new(ms_0.clone()) - .with_name("pool0") - .with_new_uuid() - .with_malloc("mem0", POOL_SIZE); - - let mut repl_0 = ReplicaBuilder::new(ms_0.clone()) - .with_pool(&pool_0) - .with_name("r0") - .with_new_uuid() - .with_size_mb(REPL_SIZE) - .with_thin(false); - - pool_0.create().await.unwrap(); - repl_0.create().await.unwrap(); - repl_0.share().await.unwrap(); - - let mut nex_0 = NexusBuilder::new(ms_nex.clone()) - .with_name(NEXUS_NAME) - .with_uuid(NEXUS_UUID) - .with_size_mb(NEXUS_SIZE) - .with_replica(&repl_0); - - nex_0.create().await.unwrap(); - nex_0.publish().await.unwrap(); - - let children = nex_0.get_nexus().await.unwrap().children; - let dev_name = children[0].device_name.as_ref().unwrap(); - - let inj = "domain=child&op=write&stage=compl&offset=0"; - let inj_w = format!("inject://{dev_name}?{inj}"); - - let inj = "domain=child&op=read&stage=compl&offset=0"; - let inj_r = format!("inject://{dev_name}?{inj}"); - - let cfg = JobCfg { - ms_nex: ms_nex.clone(), - nex_0: nex_0.clone(), - repl_0: repl_0.clone(), - inj_w: inj_w.clone(), - inj_r: inj_r.clone(), - }; - - // Run two tasks in parallel, I/O and nexus management. - let (s, r) = oneshot::channel(); - - let j0 = tokio::spawn({ - let cfg = cfg.clone(); - async move { run_io_task(s, cfg).await } - }); - tokio::pin!(j0); - - let j1 = tokio::spawn({ - let cfg = cfg.clone(); - async move { - run_manage_task(r, cfg).await; - } - }); - tokio::pin!(j1); - - let (io_res, _) = tokio::join!(j0, j1); - io_res.unwrap() -} - -#[derive(Clone)] -struct JobCfg { - ms_nex: SharedRpcHandle, - nex_0: NexusBuilder, - repl_0: ReplicaBuilder, - inj_w: String, - inj_r: String, -} - -/// Runs multiple FIO I/O jobs. -async fn run_io_task(s: Sender<()>, cfg: JobCfg) -> std::io::Result<()> { - let nvmf = cfg.nex_0.nvmf_location(); - let _cg = NmveConnectGuard::connect_addr(&nvmf.addr, &nvmf.nqn); - let path = find_mayastor_nvme_device_path(&nvmf.serial) - .unwrap() - .to_str() - .unwrap() - .to_string(); - - let jobs = (0 .. 10).map(|_| { - FioJob::new() - .with_direct(true) - .with_ioengine("libaio") - .with_iodepth(128) - .with_filename(&path) - .with_runtime(20) - .with_rw("randwrite") - }); - - let fio = Fio::new().with_jobs(jobs); - - // Notify the nexus management task that connection is complete and I/O - // starts. - s.send(()).unwrap(); - - // Start FIO. - tokio::spawn(async move { fio.run() }).await.unwrap() -} - -/// Manages the nexus in parallel to I/O task. -/// [1] Nexus is failed by injecting a fault. -/// [2] I/O running in parallel should freeze or fail, depending on how target's -/// configured. -/// [3] Nexus is recreated. -async fn run_manage_task(r: Receiver<()>, cfg: JobCfg) { - let JobCfg { - ms_nex, - inj_w, - inj_r, - mut nex_0, - repl_0, - .. - } = cfg; - - // Wait until I/O task connects and signals it is ready. - r.await.unwrap(); - - // Allow I/O to run for some time. - tokio::time::sleep(Duration::from_secs(2)).await; - - // Inject fault, so the nexus would fail. - add_fault_injection(ms_nex.clone(), &inj_w).await.unwrap(); - add_fault_injection(ms_nex.clone(), &inj_r).await.unwrap(); - - // When nexus fails, I/O should be freezing due to CRD (if enabled). - tokio::time::sleep(Duration::from_secs(2)).await; - - // Destroy the nexus, remove injectios and re-create and re-publish the - // nexus with the same ID. - // I/O would eventually retry and the new nexus would run I/O. - nex_0.shutdown().await.unwrap(); - nex_0.destroy().await.unwrap(); - - remove_fault_injection(ms_nex.clone(), &inj_w) - .await - .unwrap(); - remove_fault_injection(ms_nex.clone(), &inj_r) - .await - .unwrap(); - - let mut nex_0 = NexusBuilder::new(ms_nex.clone()) - .with_name(NEXUS_NAME) - .with_uuid(NEXUS_UUID) - .with_size_mb(NEXUS_SIZE) - .with_replica(&repl_0); - - nex_0.create().await.unwrap(); - nex_0.publish().await.unwrap(); -} diff --git a/io-engine/tests/nexus_fault_injection.rs b/io-engine/tests/nexus_fault_injection.rs index bad1d9c46..daf96b718 100644 --- a/io-engine/tests/nexus_fault_injection.rs +++ b/io-engine/tests/nexus_fault_injection.rs @@ -4,13 +4,16 @@ pub mod common; use std::time::Duration; -use io_engine::core::fault_injection::{ - FaultDomain, - FaultIoOperation, - FaultIoStage, - FaultMethod, - Injection, - InjectionBuilder, +use io_engine::core::{ + fault_injection::{ + FaultDomain, + FaultIoOperation, + FaultIoStage, + FaultMethod, + Injection, + InjectionBuilder, + }, + IoCompletionStatus, }; use common::{ @@ -24,16 +27,14 @@ use common::{ ComposeTest, }, file_io::DataSize, + fio::{spawn_fio_task, Fio, FioJob}, nexus::{test_write_to_nexus, NexusBuilder}, + nvme::{find_mayastor_nvme_device_path, NmveConnectGuard}, pool::PoolBuilder, replica::ReplicaBuilder, test::{add_fault_injection, list_fault_injections}, }; -use io_engine::core::IoCompletionStatus; -use io_engine_tests::{ - fio::{Fio, FioJob}, - nvme::{find_mayastor_nvme_device_path, NmveConnectGuard}, -}; + use spdk_rs::NvmeStatus; static POOL_SIZE: u64 = 60; @@ -493,14 +494,12 @@ async fn replica_bdev_io_injection() { .with_rw("write"), ); - tokio::spawn(async move { fio_ok.run() }) + spawn_fio_task(&fio_ok) .await - .unwrap() .expect("This FIO job must succeed"); - let r = tokio::spawn(async move { fio_fail.run() }) + let r = spawn_fio_task(&fio_fail) .await - .unwrap() .expect_err("This FIO job must fail"); assert_eq!(r.kind(), std::io::ErrorKind::Other); diff --git a/io-engine/tests/replica_crd.rs b/io-engine/tests/replica_crd.rs new file mode 100644 index 000000000..d9e750967 --- /dev/null +++ b/io-engine/tests/replica_crd.rs @@ -0,0 +1,108 @@ +#![cfg(feature = "fault-injection")] + +pub mod common; + +use nix::errno::Errno; +use std::time::Duration; + +use common::{ + compose::{rpc::v1::GrpcConnect, Binary, Builder}, + fio::{Fio, FioJob, FioJobResult}, + pool::PoolBuilder, + replica::ReplicaBuilder, + test::add_fault_injection, +}; + +use io_engine::core::fault_injection::{ + FaultDomain, + FaultIoOperation, + FaultIoStage, + FaultMethod, + InjectionBuilder, +}; + +// Test that the third CRD value is used for a replica target. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn replica_no_fail_crd() { + const POOL_SIZE: u64 = 100; + const REPL_SIZE: u64 = 80; + const REPL_NAME: &str = "r0"; + + // Set 1st, 2nd CRD to non-zero value and 3rd to zero. + // Replica must select the third one (zero). + const CRDT: &str = "15,15,0"; + + const TOTAL_DELAY: u64 = 15 * 5 * 100; + + common::composer_init(); + + let test = Builder::new() + .name("cargo-test") + .network("10.1.0.0/16") + .unwrap() + .add_container_bin( + "ms_0", + Binary::from_dbg("io-engine").with_args(vec![ + "-l", + "1", + "-Fcompact,color,nodate", + "--tgt-crdt", + CRDT, + ]), + ) + .with_clean(true) + .build() + .await + .unwrap(); + + let conn = GrpcConnect::new(&test); + let rpc = conn.grpc_handle_shared("ms_0").await.unwrap(); + + let mut pool_0 = PoolBuilder::new(rpc.clone()) + .with_name("pool0") + .with_new_uuid() + .with_malloc("mem0", POOL_SIZE); + pool_0.create().await.unwrap(); + + let mut repl_0 = ReplicaBuilder::new(rpc.clone()) + .with_pool(&pool_0) + .with_name(REPL_NAME) + .with_new_uuid() + .with_size_mb(REPL_SIZE) + .with_thin(true); + + repl_0.create().await.unwrap(); + repl_0.share().await.unwrap(); + + // Injection. + let inj_uri = InjectionBuilder::default() + .with_device_name(REPL_NAME.to_string()) + .with_domain(FaultDomain::BdevIo) + .with_io_operation(FaultIoOperation::Write) + .with_io_stage(FaultIoStage::Submission) + .with_method(FaultMethod::DATA_TRANSFER_ERROR) + .with_offset(1000, 1) + .build_uri() + .unwrap(); + + add_fault_injection(rpc.clone(), &inj_uri).await.unwrap(); + + let (_cg, path) = repl_0.nvmf_location().open().unwrap(); + + // FIO jobs. + let fio = Fio::new().with_job( + FioJob::new() + .with_name("job0") + .with_direct(true) + .with_ioengine("libaio") + .with_iodepth(1) + .with_filename_path(&path) + .with_rw("write"), + ); + + let fio_res = tokio::spawn(async { fio.run() }).await.unwrap(); + let job_res = fio_res.find_job("job0").unwrap(); + + assert_eq!(job_res.result, FioJobResult::Error(Errno::EIO)); + assert!(fio_res.total_time < Duration::from_millis(TOTAL_DELAY)); +} diff --git a/io-engine/tests/replica_thin_no_space.rs b/io-engine/tests/replica_thin_no_space.rs new file mode 100644 index 000000000..d4a7ae8c0 --- /dev/null +++ b/io-engine/tests/replica_thin_no_space.rs @@ -0,0 +1,166 @@ +#![cfg(feature = "fault-injection")] + +pub mod common; + +use nix::errno::Errno; + +use common::{ + compose::{rpc::v1::GrpcConnect, Binary, Builder}, + fio::{Fio, FioJob, FioJobResult}, + pool::PoolBuilder, + replica::ReplicaBuilder, + test::add_fault_injection, +}; + +use io_engine::core::fault_injection::{ + FaultDomain, + FaultIoStage, + InjectionBuilder, +}; + +use spdk_rs::NvmeStatus; + +#[tokio::test] +async fn replica_thin_nospc() { + common::composer_init(); + + const BLK_SIZE: u64 = 512; + + let test = Builder::new() + .name("cargo-test") + .network("10.1.0.0/16") + .unwrap() + .add_container_bin( + "ms_0", + Binary::from_dbg("io-engine").with_args(vec![ + "-l", + "1", + "-Fcompact,color,nodate", + ]), + ) + .with_clean(true) + .build() + .await + .unwrap(); + + let conn = GrpcConnect::new(&test); + let rpc = conn.grpc_handle_shared("ms_0").await.unwrap(); + + let mut pool_0 = PoolBuilder::new(rpc.clone()) + .with_name("pool0") + .with_new_uuid() + .with_malloc_blk_size("mem0", 100, BLK_SIZE); + pool_0.create().await.unwrap(); + + let mut repl_0 = ReplicaBuilder::new(rpc.clone()) + .with_pool(&pool_0) + .with_name("r0") + .with_new_uuid() + .with_size_mb(80) + .with_thin(true); + + repl_0.create().await.unwrap(); + repl_0.share().await.unwrap(); + + let mut repl_1 = ReplicaBuilder::new(rpc.clone()) + .with_pool(&pool_0) + .with_name("r1") + .with_new_uuid() + .with_size_mb(80) + .with_thin(false); + + repl_1.create().await.unwrap(); + + let nvmf = repl_0.nvmf_location(); + let (_nvmf_conn, path) = nvmf.open().unwrap(); + + let fio = Fio::new().with_job( + FioJob::new() + .with_name("j-0") + .with_direct(true) + .with_ioengine("libaio") + .with_iodepth(1) + .with_filename_path(&path) + .with_rw("write"), + ); + + let res = tokio::spawn(async move { fio.run() }).await.unwrap(); + + assert!(matches!( + res.find_job("j-0").unwrap().result, + FioJobResult::Error(Errno::ENOSPC) + )); +} + +#[tokio::test] +async fn replica_nospc_inject() { + common::composer_init(); + + const BLK_SIZE: u64 = 512; + + let test = Builder::new() + .name("cargo-test") + .network("10.1.0.0/16") + .unwrap() + .add_container_bin( + "ms_0", + Binary::from_dbg("io-engine").with_args(vec![ + "-l", + "1", + "-Fcompact,color,nodate", + ]), + ) + .with_clean(true) + .build() + .await + .unwrap(); + + let conn = GrpcConnect::new(&test); + let rpc = conn.grpc_handle_shared("ms_0").await.unwrap(); + + let mut pool_0 = PoolBuilder::new(rpc.clone()) + .with_name("pool0") + .with_new_uuid() + .with_malloc_blk_size("mem0", 100, BLK_SIZE); + pool_0.create().await.unwrap(); + + let mut repl_0 = ReplicaBuilder::new(rpc.clone()) + .with_pool(&pool_0) + .with_name("r0") + .with_new_uuid() + .with_size_mb(80) + .with_thin(true); + + repl_0.create().await.unwrap(); + repl_0.share().await.unwrap(); + + let inj_uri = InjectionBuilder::default() + .with_device_name("r0".to_string()) + .with_domain(FaultDomain::BdevIo) + .with_io_stage(FaultIoStage::Submission) + .with_method_nvme_error(NvmeStatus::NO_SPACE) + .build_uri() + .unwrap(); + + add_fault_injection(rpc.clone(), &inj_uri).await.unwrap(); + + let nvmf = repl_0.nvmf_location(); + let (_nvmf_conn, path) = nvmf.open().unwrap(); + + let fio = Fio::new().with_job( + FioJob::new() + .with_name("j-0") + .with_direct(true) + .with_ioengine("libaio") + .with_iodepth(1) + .with_filename_path(&path) + .with_rw("write"), + ); + + let res = tokio::spawn(async move { fio.run() }).await.unwrap(); + + assert!(matches!( + res.find_job("j-0").unwrap().result, + FioJobResult::Error(Errno::ENOSPC) + )); +} diff --git a/nix/pkgs/libspdk/default.nix b/nix/pkgs/libspdk/default.nix index 65baa50a5..913387e36 100644 --- a/nix/pkgs/libspdk/default.nix +++ b/nix/pkgs/libspdk/default.nix @@ -56,13 +56,13 @@ let # 7. Copy SHA256 from 'got' of the error message to 'sha256' field. # 8. 'nix-shell' build must now succeed. drvAttrs = rec { - version = "23.05-b1f0b4e"; + version = "23.05-baffd90"; src = fetchFromGitHub { owner = "openebs"; repo = "spdk"; - rev = "b1f0b4ea640441e4cde551aae2a3368fe811366e"; - sha256 = "sha256-F3wwEUnf1o30oMHAt3CRZpTOoeaqZwo7hRpPlr4S+Vg="; + rev = "baffd90809bdd0b113b76fc7c9d7663b69d26752"; + sha256 = "sha256-tyxtXh7RpU6VtBlEjZ5MotnKQ4uZbbLD5sV+ndkuHhc="; fetchSubmodules = true; };