From e1297989131f6ce197bac4d6139c671adda2a463 Mon Sep 17 00:00:00 2001 From: Dmitry Savitskiy Date: Tue, 15 Aug 2023 16:15:05 +0300 Subject: [PATCH 1/2] fix(nvmf): removing namespace before destroying subsystem When nexus Bdev is destroyed, SPDK sends out SPDK_BDEV_EVENT_REMOVE event. In response to this event, SPDK NVMF subsystem removes the corresponding namespace. When io-engine destroys the subsystem, it first stops it. Sometimes, the event for destroying nexus Bdev arrives before the subsystem is destroyed but it is stopped, which leads to an assertion in SPDK (actual_old_state == expected_old_state). Signed-off-by: Dmitry Savitskiy --- io-engine/src/subsys/nvmf/subsystem.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/io-engine/src/subsys/nvmf/subsystem.rs b/io-engine/src/subsys/nvmf/subsystem.rs index c33a0e542e..9f94eec26e 100644 --- a/io-engine/src/subsys/nvmf/subsystem.rs +++ b/io-engine/src/subsys/nvmf/subsystem.rs @@ -40,6 +40,7 @@ use spdk_rs::libspdk::{ spdk_nvmf_subsystem_listener_get_trid, spdk_nvmf_subsystem_pause, spdk_nvmf_subsystem_remove_host, + spdk_nvmf_subsystem_remove_ns, spdk_nvmf_subsystem_resume, spdk_nvmf_subsystem_set_allow_any_host, spdk_nvmf_subsystem_set_ana_reporting, @@ -185,7 +186,7 @@ impl NvmfSubsystem { ss.set_ana_reporting(false)?; ss.allow_any(false); if let Err(e) = ss.add_namespace(bdev, ptpl) { - ss.destroy(); + ss.destroy_subsys(); return Err(e); } Ok(ss) @@ -537,6 +538,17 @@ impl NvmfSubsystem { /// destroy the subsystem pub fn destroy(&self) -> i32 { + unsafe { + if spdk_nvmf_subsystem_remove_ns(self.0.as_ptr(), 1) != 0 { + error!(?self, "failed to remove namespace while destroying"); + } + } + + self.destroy_subsys() + } + + /// destroy the SPDK object for subsystem + fn destroy_subsys(&self) -> i32 { unsafe { if (*self.0.as_ptr()).destroying { warn!("Subsystem destruction already started"); From 76d40a38b141cccfe8b4172135d5e64253e988f9 Mon Sep 17 00:00:00 2001 From: Dmitry Savitskiy Date: Tue, 15 Aug 2023 16:17:55 +0300 Subject: [PATCH 2/2] feat(nvmf): adding support for configuring target CRD Signed-off-by: Dmitry Savitskiy --- io-engine-tests/src/fio.rs | 27 +++- io-engine-tests/src/nexus.rs | 13 ++ io-engine-tests/src/nvmf.rs | 2 +- io-engine/src/subsys/config/opts.rs | 7 + io-engine/tests/nexus_fail_crd.rs | 234 ++++++++++++++++++++++++++++ 5 files changed, 275 insertions(+), 8 deletions(-) create mode 100644 io-engine/tests/nexus_fail_crd.rs diff --git a/io-engine-tests/src/fio.rs b/io-engine-tests/src/fio.rs index eec11a53ce..41951b8c06 100644 --- a/io-engine-tests/src/fio.rs +++ b/io-engine-tests/src/fio.rs @@ -67,7 +67,7 @@ impl FioJob { let mut r = vec![ format!("--name={}", self.name), format!("--ioengine={}", self.ioengine), - format!("--filename='{}'", self.filename), + format!("--filename={}", self.filename), format!("--thread=1"), format!("--direct={}", if self.direct { "1" } else { "0" }), format!("--norandommap=1"), @@ -112,6 +112,12 @@ impl FioJob { self } + /// Read-write FIO mode. + pub fn with_rw(mut self, rw: &str) -> Self { + self.rw = rw.to_string(); + self + } + /// If true, use non-buffered I/O (usually O_DIRECT). Default: true. pub fn with_direct(mut self, v: bool) -> Self { self.direct = v; @@ -171,8 +177,8 @@ impl Fio { Default::default() } - pub fn with_jobs(mut self, jobs: Vec) -> Self { - self.jobs = jobs; + pub fn with_jobs(mut self, jobs: impl Iterator) -> Self { + jobs.for_each(|j| self.jobs.push(j)); self } @@ -201,12 +207,12 @@ impl Fio { .collect::>() .join(" "); + let script = format!("{cmd} {args}"); + if self.verbose { - println!("{cmd} {args}"); + println!("{script}"); } - let script = format!("{cmd} {args}"); - let (exit, stdout, stderr) = run_script::run( &script, &Vec::new(), @@ -215,11 +221,18 @@ impl Fio { .unwrap(); if exit == 0 { + if self.verbose { + println!("FIO:"); + println!("{script}"); + println!("Output:"); + println!("{stdout}"); + } + Ok(()) } else { if self.verbose_err { println!("Error running FIO:"); - println!("{cmd} {args}"); + println!("{script}"); println!("Exit code: {exit}"); println!("Output:"); println!("{stdout}"); diff --git a/io-engine-tests/src/nexus.rs b/io-engine-tests/src/nexus.rs index 9281e3ed2b..5f06d0ea8f 100644 --- a/io-engine-tests/src/nexus.rs +++ b/io-engine-tests/src/nexus.rs @@ -15,6 +15,7 @@ use super::{ RebuildHistoryRecord, RebuildHistoryRequest, RemoveChildNexusRequest, + ShutdownNexusRequest, }, SharedRpcHandle, Status, @@ -179,6 +180,18 @@ impl NexusBuilder { .map(|r| r.into_inner().nexus.unwrap()) } + pub async fn shutdown(&mut self) -> Result<(), Status> { + self.rpc() + .lock() + .await + .nexus + .shutdown_nexus(ShutdownNexusRequest { + uuid: self.uuid(), + }) + .await + .map(|_| ()) + } + pub async fn destroy(&mut self) -> Result<(), Status> { self.rpc() .lock() diff --git a/io-engine-tests/src/nvmf.rs b/io-engine-tests/src/nvmf.rs index 27b93f3646..074a537207 100644 --- a/io-engine-tests/src/nvmf.rs +++ b/io-engine-tests/src/nvmf.rs @@ -77,7 +77,7 @@ pub async fn test_fio_to_nvmf( nvmf: &NvmfLocation, mut fio: Fio, ) -> std::io::Result<()> { - let tgt = nvmf.as_args().join(" "); + let tgt = format!("'{}'", nvmf.as_args().join(" ")); fio.jobs = fio .jobs diff --git a/io-engine/src/subsys/config/opts.rs b/io-engine/src/subsys/config/opts.rs index 3e1e3e65b3..ffb100d35e 100644 --- a/io-engine/src/subsys/config/opts.rs +++ b/io-engine/src/subsys/config/opts.rs @@ -87,6 +87,13 @@ impl From for Box { let mut out = Self::default(); copy_str_with_null(&o.name, &mut out.name); out.max_subsystems = o.max_namespaces; + let c = try_from_env("NVMF_TGT_CRDT", 0); + if c > 0 { + out.crdt[0] = c; + info!("NVMF target Command Retry Delay (CRD) is {c} sec"); + } else { + info!("NVMF target Command Retry Delay (CRD) is disabled"); + } out } } diff --git a/io-engine/tests/nexus_fail_crd.rs b/io-engine/tests/nexus_fail_crd.rs new file mode 100644 index 0000000000..2a3064061e --- /dev/null +++ b/io-engine/tests/nexus_fail_crd.rs @@ -0,0 +1,234 @@ +pub mod common; + +use std::{sync::Arc, time::Duration}; +use tokio::sync::{ + oneshot, + oneshot::{Receiver, Sender}, +}; + +use common::{ + compose::{ + rpc::v1::{GrpcConnect, SharedRpcHandle}, + Binary, + Builder, + }, + fio::{Fio, FioJob}, + nexus::NexusBuilder, + nvme::{find_mayastor_nvme_device_path, NmveConnectGuard}, + pool::PoolBuilder, + replica::ReplicaBuilder, + test::{add_fault_injection, remove_fault_injection}, +}; + +const POOL_SIZE: u64 = 500; +const REPL_SIZE: u64 = 450; +const NEXUS_NAME: &str = "nexus_0"; +const NEXUS_SIZE: u64 = REPL_SIZE; +const NEXUS_UUID: &str = "bbe6cbb6-c508-443a-877a-af5fa690c760"; + +/// Tests that without CRD enabled, initiator would eventually fail I/Os. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn nexus_fail_no_crd() { + test_nexus_fail(false) + .await + .expect_err("I/O expected to fail"); +} + +/// Tests that CRD properly delays I/O retries on initiator, while the target +/// has a chance to replace a failed nexus. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn nexus_fail_crd() { + test_nexus_fail(true) + .await + .expect("I/O expected to succeed"); +} + +async fn test_nexus_fail(crd: bool) -> std::io::Result<()> { + common::composer_init(); + + let nex_bin = Binary::from_dbg("io-engine") + .with_env( + "RUST_LOG", + "debug,io_engine::bdev::nexus::nexus_io=trace,h2=info", + ) + .with_args(vec!["-l", "1,2,3,4", "-Fcolor,compact,nodate"]); + + let nex_bin = if crd { + nex_bin.with_env("NVMF_TGT_CRDT", "20") + } else { + nex_bin + }; + + let test = Builder::new() + .name("cargo-test") + .network("10.1.0.0/16") + .unwrap() + .add_container_bin( + "ms_0", + Binary::from_dbg("io-engine").with_args(vec!["-l", "5"]), + ) + .add_container_bin("ms_nex", nex_bin) + .with_clean(true) + .build() + .await + .unwrap(); + + let test = Arc::new(test); + + let conn = GrpcConnect::new(&test); + + let ms_0 = conn.grpc_handle_shared("ms_0").await.unwrap(); + let ms_nex = conn.grpc_handle_shared("ms_nex").await.unwrap(); + + let mut pool_0 = PoolBuilder::new(ms_0.clone()) + .with_name("pool0") + .with_new_uuid() + .with_malloc("mem0", POOL_SIZE); + + let mut repl_0 = ReplicaBuilder::new(ms_0.clone()) + .with_pool(&pool_0) + .with_name("r0") + .with_new_uuid() + .with_size_mb(REPL_SIZE) + .with_thin(false); + + pool_0.create().await.unwrap(); + repl_0.create().await.unwrap(); + repl_0.share().await.unwrap(); + + let mut nex_0 = NexusBuilder::new(ms_nex.clone()) + .with_name(NEXUS_NAME) + .with_uuid(NEXUS_UUID) + .with_size_mb(NEXUS_SIZE) + .with_replica(&repl_0); + + nex_0.create().await.unwrap(); + nex_0.publish().await.unwrap(); + + let children = nex_0.get_nexus().await.unwrap().children; + let dev_name = children[0].device_name.as_ref().unwrap(); + + let inj = "domain=nexus&op=write&offset=0"; + let inj_w = format!("inject://{dev_name}?{inj}"); + + let inj = "domain=nexus&op=read&offset=0"; + let inj_r = format!("inject://{dev_name}?{inj}"); + + let cfg = JobCfg { + ms_nex: ms_nex.clone(), + nex_0: nex_0.clone(), + repl_0: repl_0.clone(), + inj_w: inj_w.clone(), + inj_r: inj_r.clone(), + }; + + // Run two tasks in parallel, I/O and nexus management. + let (s, r) = oneshot::channel(); + + let j0 = tokio::spawn({ + let cfg = cfg.clone(); + async move { run_io_task(s, cfg).await } + }); + tokio::pin!(j0); + + let j1 = tokio::spawn({ + let cfg = cfg.clone(); + async move { + run_manage_task(r, cfg).await; + } + }); + tokio::pin!(j1); + + let (io_res, _) = tokio::join!(j0, j1); + io_res.unwrap() +} + +#[derive(Clone)] +struct JobCfg { + ms_nex: SharedRpcHandle, + nex_0: NexusBuilder, + repl_0: ReplicaBuilder, + inj_w: String, + inj_r: String, +} + +/// Runs multiple FIO I/O jobs. +async fn run_io_task(s: Sender<()>, cfg: JobCfg) -> std::io::Result<()> { + let nvmf = cfg.nex_0.nvmf_location(); + let _cg = NmveConnectGuard::connect_addr(&nvmf.addr, &nvmf.nqn); + let path = find_mayastor_nvme_device_path(&nvmf.serial) + .unwrap() + .to_str() + .unwrap() + .to_string(); + + let jobs = (0 .. 10).map(|_| { + FioJob::new() + .with_direct(true) + .with_ioengine("libaio") + .with_iodepth(128) + .with_filename(&path) + .with_runtime(20) + .with_rw("randwrite") + }); + + let fio = Fio::new().with_jobs(jobs); + + // Notify the nexus management task that connection is complete and I/O + // starts. + s.send(()).unwrap(); + + // Start FIO. + tokio::spawn(async move { fio.run() }).await.unwrap() +} + +/// Manages the nexus in parallel to I/O task. +/// [1] Nexus is failed by injecting a fault. +/// [2] I/O running in parallel should freeze or fail, depending on how target's +/// configured. +/// [3] Nexus is recreated. +async fn run_manage_task(r: Receiver<()>, cfg: JobCfg) { + let JobCfg { + ms_nex, + inj_w, + inj_r, + mut nex_0, + repl_0, + .. + } = cfg; + + // Wait until I/O task connects and signals it is ready. + r.await.unwrap(); + + // Allow I/O to run for some time. + tokio::time::sleep(Duration::from_secs(2)).await; + + // Inject fault, so the nexus would fail. + add_fault_injection(ms_nex.clone(), &inj_w).await.unwrap(); + add_fault_injection(ms_nex.clone(), &inj_r).await.unwrap(); + + // When nexus fails, I/O should be freezing due to CRD (if enabled). + tokio::time::sleep(Duration::from_secs(2)).await; + + // Destroy the nexus, remove injectios and re-create and re-publish the + // nexus with the same ID. + // I/O would eventually retry and the new nexus would run I/O. + nex_0.shutdown().await.unwrap(); + nex_0.destroy().await.unwrap(); + + remove_fault_injection(ms_nex.clone(), &inj_w) + .await + .unwrap(); + remove_fault_injection(ms_nex.clone(), &inj_r) + .await + .unwrap(); + + let mut nex_0 = NexusBuilder::new(ms_nex.clone()) + .with_name(NEXUS_NAME) + .with_uuid(NEXUS_UUID) + .with_size_mb(NEXUS_SIZE) + .with_replica(&repl_0); + + nex_0.create().await.unwrap(); + nex_0.publish().await.unwrap(); +}