Skip to content

Commit

Permalink
Try #1491:
Browse files Browse the repository at this point in the history
  • Loading branch information
mayastor-bors committed Aug 16, 2023
2 parents 47d0450 + 76d40a3 commit 61ef43c
Show file tree
Hide file tree
Showing 6 changed files with 288 additions and 9 deletions.
27 changes: 20 additions & 7 deletions io-engine-tests/src/fio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl FioJob {
let mut r = vec![
format!("--name={}", self.name),
format!("--ioengine={}", self.ioengine),
format!("--filename='{}'", self.filename),
format!("--filename={}", self.filename),
format!("--thread=1"),
format!("--direct={}", if self.direct { "1" } else { "0" }),
format!("--norandommap=1"),
Expand Down Expand Up @@ -112,6 +112,12 @@ impl FioJob {
self
}

/// Read-write FIO mode.
pub fn with_rw(mut self, rw: &str) -> Self {
self.rw = rw.to_string();
self
}

/// If true, use non-buffered I/O (usually O_DIRECT). Default: true.
pub fn with_direct(mut self, v: bool) -> Self {
self.direct = v;
Expand Down Expand Up @@ -171,8 +177,8 @@ impl Fio {
Default::default()
}

pub fn with_jobs(mut self, jobs: Vec<FioJob>) -> Self {
self.jobs = jobs;
pub fn with_jobs(mut self, jobs: impl Iterator<Item = FioJob>) -> Self {
jobs.for_each(|j| self.jobs.push(j));
self
}

Expand Down Expand Up @@ -201,12 +207,12 @@ impl Fio {
.collect::<Vec<_>>()
.join(" ");

let script = format!("{cmd} {args}");

if self.verbose {
println!("{cmd} {args}");
println!("{script}");
}

let script = format!("{cmd} {args}");

let (exit, stdout, stderr) = run_script::run(
&script,
&Vec::new(),
Expand All @@ -215,11 +221,18 @@ impl Fio {
.unwrap();

if exit == 0 {
if self.verbose {
println!("FIO:");
println!("{script}");
println!("Output:");
println!("{stdout}");
}

Ok(())
} else {
if self.verbose_err {
println!("Error running FIO:");
println!("{cmd} {args}");
println!("{script}");
println!("Exit code: {exit}");
println!("Output:");
println!("{stdout}");
Expand Down
13 changes: 13 additions & 0 deletions io-engine-tests/src/nexus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use super::{
RebuildHistoryRecord,
RebuildHistoryRequest,
RemoveChildNexusRequest,
ShutdownNexusRequest,
},
SharedRpcHandle,
Status,
Expand Down Expand Up @@ -179,6 +180,18 @@ impl NexusBuilder {
.map(|r| r.into_inner().nexus.unwrap())
}

pub async fn shutdown(&mut self) -> Result<(), Status> {
self.rpc()
.lock()
.await
.nexus
.shutdown_nexus(ShutdownNexusRequest {
uuid: self.uuid(),
})
.await
.map(|_| ())
}

pub async fn destroy(&mut self) -> Result<(), Status> {
self.rpc()
.lock()
Expand Down
2 changes: 1 addition & 1 deletion io-engine-tests/src/nvmf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub async fn test_fio_to_nvmf(
nvmf: &NvmfLocation,
mut fio: Fio,
) -> std::io::Result<()> {
let tgt = nvmf.as_args().join(" ");
let tgt = format!("'{}'", nvmf.as_args().join(" "));

fio.jobs = fio
.jobs
Expand Down
7 changes: 7 additions & 0 deletions io-engine/src/subsys/config/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ impl From<NvmfTgtConfig> for Box<spdk_nvmf_target_opts> {
let mut out = Self::default();
copy_str_with_null(&o.name, &mut out.name);
out.max_subsystems = o.max_namespaces;
let c = try_from_env("NVMF_TGT_CRDT", 0);
if c > 0 {
out.crdt[0] = c;
info!("NVMF target Command Retry Delay (CRD) is {c} sec");
} else {
info!("NVMF target Command Retry Delay (CRD) is disabled");
}
out
}
}
Expand Down
14 changes: 13 additions & 1 deletion io-engine/src/subsys/nvmf/subsystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use spdk_rs::libspdk::{
spdk_nvmf_subsystem_listener_get_trid,
spdk_nvmf_subsystem_pause,
spdk_nvmf_subsystem_remove_host,
spdk_nvmf_subsystem_remove_ns,
spdk_nvmf_subsystem_resume,
spdk_nvmf_subsystem_set_allow_any_host,
spdk_nvmf_subsystem_set_ana_reporting,
Expand Down Expand Up @@ -185,7 +186,7 @@ impl NvmfSubsystem {
ss.set_ana_reporting(false)?;
ss.allow_any(false);
if let Err(e) = ss.add_namespace(bdev, ptpl) {
ss.destroy();
ss.destroy_subsys();
return Err(e);
}
Ok(ss)
Expand Down Expand Up @@ -537,6 +538,17 @@ impl NvmfSubsystem {

/// destroy the subsystem
pub fn destroy(&self) -> i32 {
unsafe {
if spdk_nvmf_subsystem_remove_ns(self.0.as_ptr(), 1) != 0 {
error!(?self, "failed to remove namespace while destroying");
}
}

self.destroy_subsys()
}

/// destroy the SPDK object for subsystem
fn destroy_subsys(&self) -> i32 {
unsafe {
if (*self.0.as_ptr()).destroying {
warn!("Subsystem destruction already started");
Expand Down
234 changes: 234 additions & 0 deletions io-engine/tests/nexus_fail_crd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
pub mod common;

use std::{sync::Arc, time::Duration};
use tokio::sync::{
oneshot,
oneshot::{Receiver, Sender},
};

use common::{
compose::{
rpc::v1::{GrpcConnect, SharedRpcHandle},
Binary,
Builder,
},
fio::{Fio, FioJob},
nexus::NexusBuilder,
nvme::{find_mayastor_nvme_device_path, NmveConnectGuard},
pool::PoolBuilder,
replica::ReplicaBuilder,
test::{add_fault_injection, remove_fault_injection},
};

const POOL_SIZE: u64 = 500;
const REPL_SIZE: u64 = 450;
const NEXUS_NAME: &str = "nexus_0";
const NEXUS_SIZE: u64 = REPL_SIZE;
const NEXUS_UUID: &str = "bbe6cbb6-c508-443a-877a-af5fa690c760";

/// Tests that without CRD enabled, initiator would eventually fail I/Os.
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn nexus_fail_no_crd() {
test_nexus_fail(false)
.await
.expect_err("I/O expected to fail");
}

/// Tests that CRD properly delays I/O retries on initiator, while the target
/// has a chance to replace a failed nexus.
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn nexus_fail_crd() {
test_nexus_fail(true)
.await
.expect("I/O expected to succeed");
}

async fn test_nexus_fail(crd: bool) -> std::io::Result<()> {
common::composer_init();

let nex_bin = Binary::from_dbg("io-engine")
.with_env(
"RUST_LOG",
"debug,io_engine::bdev::nexus::nexus_io=trace,h2=info",
)
.with_args(vec!["-l", "1,2,3,4", "-Fcolor,compact,nodate"]);

let nex_bin = if crd {
nex_bin.with_env("NVMF_TGT_CRDT", "20")
} else {
nex_bin
};

let test = Builder::new()
.name("cargo-test")
.network("10.1.0.0/16")
.unwrap()
.add_container_bin(
"ms_0",
Binary::from_dbg("io-engine").with_args(vec!["-l", "5"]),
)
.add_container_bin("ms_nex", nex_bin)
.with_clean(true)
.build()
.await
.unwrap();

let test = Arc::new(test);

let conn = GrpcConnect::new(&test);

let ms_0 = conn.grpc_handle_shared("ms_0").await.unwrap();
let ms_nex = conn.grpc_handle_shared("ms_nex").await.unwrap();

let mut pool_0 = PoolBuilder::new(ms_0.clone())
.with_name("pool0")
.with_new_uuid()
.with_malloc("mem0", POOL_SIZE);

let mut repl_0 = ReplicaBuilder::new(ms_0.clone())
.with_pool(&pool_0)
.with_name("r0")
.with_new_uuid()
.with_size_mb(REPL_SIZE)
.with_thin(false);

pool_0.create().await.unwrap();
repl_0.create().await.unwrap();
repl_0.share().await.unwrap();

let mut nex_0 = NexusBuilder::new(ms_nex.clone())
.with_name(NEXUS_NAME)
.with_uuid(NEXUS_UUID)
.with_size_mb(NEXUS_SIZE)
.with_replica(&repl_0);

nex_0.create().await.unwrap();
nex_0.publish().await.unwrap();

let children = nex_0.get_nexus().await.unwrap().children;
let dev_name = children[0].device_name.as_ref().unwrap();

let inj = "domain=nexus&op=write&offset=0";
let inj_w = format!("inject://{dev_name}?{inj}");

let inj = "domain=nexus&op=read&offset=0";
let inj_r = format!("inject://{dev_name}?{inj}");

let cfg = JobCfg {
ms_nex: ms_nex.clone(),
nex_0: nex_0.clone(),
repl_0: repl_0.clone(),
inj_w: inj_w.clone(),
inj_r: inj_r.clone(),
};

// Run two tasks in parallel, I/O and nexus management.
let (s, r) = oneshot::channel();

let j0 = tokio::spawn({
let cfg = cfg.clone();
async move { run_io_task(s, cfg).await }
});
tokio::pin!(j0);

let j1 = tokio::spawn({
let cfg = cfg.clone();
async move {
run_manage_task(r, cfg).await;
}
});
tokio::pin!(j1);

let (io_res, _) = tokio::join!(j0, j1);
io_res.unwrap()
}

#[derive(Clone)]
struct JobCfg {
ms_nex: SharedRpcHandle,
nex_0: NexusBuilder,
repl_0: ReplicaBuilder,
inj_w: String,
inj_r: String,
}

/// Runs multiple FIO I/O jobs.
async fn run_io_task(s: Sender<()>, cfg: JobCfg) -> std::io::Result<()> {
let nvmf = cfg.nex_0.nvmf_location();
let _cg = NmveConnectGuard::connect_addr(&nvmf.addr, &nvmf.nqn);
let path = find_mayastor_nvme_device_path(&nvmf.serial)
.unwrap()
.to_str()
.unwrap()
.to_string();

let jobs = (0 .. 10).map(|_| {
FioJob::new()
.with_direct(true)
.with_ioengine("libaio")
.with_iodepth(128)
.with_filename(&path)
.with_runtime(20)
.with_rw("randwrite")
});

let fio = Fio::new().with_jobs(jobs);

// Notify the nexus management task that connection is complete and I/O
// starts.
s.send(()).unwrap();

// Start FIO.
tokio::spawn(async move { fio.run() }).await.unwrap()
}

/// Manages the nexus in parallel to I/O task.
/// [1] Nexus is failed by injecting a fault.
/// [2] I/O running in parallel should freeze or fail, depending on how target's
/// configured.
/// [3] Nexus is recreated.
async fn run_manage_task(r: Receiver<()>, cfg: JobCfg) {
let JobCfg {
ms_nex,
inj_w,
inj_r,
mut nex_0,
repl_0,
..
} = cfg;

// Wait until I/O task connects and signals it is ready.
r.await.unwrap();

// Allow I/O to run for some time.
tokio::time::sleep(Duration::from_secs(2)).await;

// Inject fault, so the nexus would fail.
add_fault_injection(ms_nex.clone(), &inj_w).await.unwrap();
add_fault_injection(ms_nex.clone(), &inj_r).await.unwrap();

// When nexus fails, I/O should be freezing due to CRD (if enabled).
tokio::time::sleep(Duration::from_secs(2)).await;

// Destroy the nexus, remove injectios and re-create and re-publish the
// nexus with the same ID.
// I/O would eventually retry and the new nexus would run I/O.
nex_0.shutdown().await.unwrap();
nex_0.destroy().await.unwrap();

remove_fault_injection(ms_nex.clone(), &inj_w)
.await
.unwrap();
remove_fault_injection(ms_nex.clone(), &inj_r)
.await
.unwrap();

let mut nex_0 = NexusBuilder::new(ms_nex.clone())
.with_name(NEXUS_NAME)
.with_uuid(NEXUS_UUID)
.with_size_mb(NEXUS_SIZE)
.with_replica(&repl_0);

nex_0.create().await.unwrap();
nex_0.publish().await.unwrap();
}

0 comments on commit 61ef43c

Please sign in to comment.