Skip to content

Commit

Permalink
Merge #1524
Browse files Browse the repository at this point in the history
1524: feat(nvmf): selection different CRD for replicas and certain nexus error r=dsavitskiy a=dsavitskiy

NVMF subsystem will select different preconfigured CRDs for certain cases:
3 is selected for replica targets
2 is selected for ENOSPC and reservation errors on nexus targets
1 is select for all other error on nexus target

Co-authored-by: Dmitry Savitskiy <[email protected]>
  • Loading branch information
mayastor-bors and dsavitskiy committed Oct 28, 2023
2 parents d138a30 + f9e5a81 commit a7b0f05
Show file tree
Hide file tree
Showing 16 changed files with 1,093 additions and 317 deletions.
185 changes: 139 additions & 46 deletions io-engine-tests/src/fio.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,51 @@
use super::file_io::DataSize;
use std::sync::atomic::{AtomicU32, Ordering};
use nix::errno::Errno;
use std::{
path::Path,
sync::atomic::{AtomicU32, Ordering},
time::{Duration, Instant},
};

/// TODO
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum FioJobResult {
NotRun,
Ok,
Error(Errno),
}

/// TODO
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct FioJob {
/// Job counter.
counter: u32,
pub counter: u32,
/// Job name.
name: String,
pub name: String,
/// I/O engine to use. Default: spdk.
ioengine: String,
pub ioengine: String,
/// Filename.
filename: String,
pub filename: String,
/// Type of I/O pattern.
rw: String,
pub rw: String,
/// If true, use non-buffered I/O (usually O_DIRECT). Default: true.
direct: bool,
pub direct: bool,
/// Block size for I/O units. Default: 4k.
blocksize: Option<u32>,
pub blocksize: Option<u32>,
/// Offset in the file to start I/O. Data before the offset will not be
/// touched.
offset: Option<DataSize>,
pub offset: Option<DataSize>,
/// Number of I/O units to keep in flight against the file.
iodepth: Option<u32>,
pub iodepth: Option<u32>,
/// Number of clones (processes/threads performing the same workload) of
/// this job. Default: 1.
numjobs: u32,
pub numjobs: u32,
/// Terminate processing after the specified number of seconds.
runtime: Option<u32>,
pub runtime: Option<u32>,
/// Total size of I/O for this job.
size: Option<DataSize>,
pub size: Option<DataSize>,
/// Run result.
pub result: FioJobResult,
}

impl Default for FioJob {
Expand Down Expand Up @@ -58,6 +73,7 @@ impl FioJob {
numjobs: 1,
runtime: None,
size: None,
result: FioJobResult::NotRun,
}
}

Expand Down Expand Up @@ -100,6 +116,12 @@ impl FioJob {
r
}

/// Sets job name.
pub fn with_name(mut self, v: &str) -> Self {
self.name = v.to_string();
self
}

/// I/O engine to use. Default: spdk.
pub fn with_ioengine(mut self, v: &str) -> Self {
self.ioengine = v.to_string();
Expand All @@ -112,6 +134,12 @@ impl FioJob {
self
}

/// Filename.
pub fn with_filename_path(mut self, v: impl AsRef<Path>) -> Self {
self.filename = v.as_ref().to_str().unwrap().to_string();
self
}

/// Read-write FIO mode.
pub fn with_rw(mut self, rw: &str) -> Self {
self.rw = rw.to_string();
Expand Down Expand Up @@ -170,6 +198,10 @@ pub struct Fio {
pub jobs: Vec<FioJob>,
pub verbose: bool,
pub verbose_err: bool,
pub script: String,
pub total_time: Duration,
pub exit: i32,
pub err_messages: Vec<String>,
}

impl Fio {
Expand Down Expand Up @@ -197,7 +229,7 @@ impl Fio {
self
}

pub fn run(&self) -> std::io::Result<()> {
pub fn run(mut self) -> Self {
let cmd = "sudo -E LD_PRELOAD=$FIO_SPDK fio";

let args = self
Expand All @@ -207,49 +239,110 @@ impl Fio {
.collect::<Vec<_>>()
.join(" ");

let script = format!("{cmd} {args}");
self.script = format!("{cmd} --output-format=json {args}");

if self.verbose {
println!("{script}");
if self.verbose || self.verbose_err {
println!("{}", self.script);
}

let start_time = Instant::now();
let (exit, stdout, stderr) = run_script::run(
&script,
&self.script,
&Vec::new(),
&run_script::ScriptOptions::new(),
)
.unwrap();

if exit == 0 {
if self.verbose {
println!("FIO:");
println!("{script}");
println!("Output:");
println!("{stdout}");
}

Ok(())
} else {
if self.verbose_err {
println!("Error running FIO:");
println!("{script}");
println!("Exit code: {exit}");
println!("Output:");
println!("{stdout}");
println!("Error output:");
println!("{stderr}");
}

Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("SPDK FIO error: {stderr}"),
))
self.total_time = start_time.elapsed();
self.push_err(&stderr);
self.exit = exit;

if let Err(e) = self.update_result(&stdout) {
self.push_err(&e);
}

if self.verbose_err {
println!(
"Error(s) running FIO: {s}",
s = self.err_messages.join("\n")
);
}

self
}

/// TODO
fn push_err(&mut self, msg: &str) {
let s = msg.trim_end_matches('\n');
if !s.is_empty() {
self.err_messages.push(s.to_string());
}
}

/// TODO
fn update_result(&mut self, out: &str) -> Result<(), String> {
// Filter out lines error messages, those starting with "fio: ".
let out = out
.split('\n')
.filter(|s| !s.starts_with("fio: "))
.collect::<Vec<_>>()
.join("\n");

serde_json::from_str::<serde_json::Value>(&out)
.map_err(|e| e.to_string())?
.get("jobs")
.ok_or_else(|| "No 'jobs' item in output".to_string())?
.as_array()
.ok_or_else(|| "'jobs' item in output is not an array".to_string())?
.iter()
.for_each(|j| {
let name =
j.get("jobname").unwrap().as_str().unwrap().to_string();
let err = j.get("error").unwrap().as_i64().unwrap() as i32;

if let Some(j) = self.find_job_mut(&name) {
if err == 0 {
j.result = FioJobResult::Ok;
} else {
j.result = FioJobResult::Error(Errno::from_i32(err));
}
}
});

Ok(())
}

/// TODO
pub fn find_job(&self, name: &str) -> Option<&FioJob> {
self.jobs.iter().find(|j| j.name == name)
}

/// TODO
pub fn find_job_mut(&mut self, name: &str) -> Option<&mut FioJob> {
self.jobs.iter_mut().find(|j| j.name == name)
}
}

/// TODO
pub async fn run_fio_jobs(fio: &Fio) -> std::io::Result<()> {
let fio = fio.clone();
tokio::spawn(async move { fio.run() }).await.unwrap()
/// Spawns a tokio task and runs the given FIO on it. Any FIO error is converted
/// into an `std::io::Result`.
pub async fn spawn_fio_task(fio: &Fio) -> std::io::Result<()> {
let fio = tokio::spawn({
let fio = fio.clone();
async move { fio.run() }
})
.await
.unwrap();

if fio.exit == 0 {
Ok(())
} else {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!(
"SPDK FIO error: {exit} {err_msg}",
exit = fio.exit,
err_msg = fio.err_messages.join("\n")
),
))
}
}
21 changes: 20 additions & 1 deletion io-engine-tests/src/nexus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use super::{
DestroyNexusRequest,
ListNexusOptions,
Nexus,
NexusNvmePreemption,
NvmeReservation,
PublishNexusRequest,
RebuildHistoryRecord,
RebuildHistoryRequest,
Expand Down Expand Up @@ -46,6 +48,7 @@ pub struct NexusBuilder {
resv_key: u64,
preempt_key: u64,
resv_type: Option<i32>,
preempt_policy: i32,
children: Option<Vec<String>>,
nexus_info_key: Option<String>,
serial: Option<String>,
Expand All @@ -63,6 +66,7 @@ impl NexusBuilder {
resv_key: 1,
preempt_key: 0,
resv_type: None,
preempt_policy: 0,
children: None,
nexus_info_key: None,
serial: None,
Expand Down Expand Up @@ -120,6 +124,21 @@ impl NexusBuilder {
self.with_bdev(&r.bdev())
}

pub fn with_resv_key(mut self, r: u64) -> Self {
self.resv_key = r;
self
}

pub fn with_resv_type(mut self, r: NvmeReservation) -> Self {
self.resv_type = Some(r as i32);
self
}

pub fn with_preempt_policy(mut self, r: NexusNvmePreemption) -> Self {
self.preempt_policy = r as i32;
self
}

fn replica_uri(&self, r: &ReplicaBuilder) -> String {
if r.rpc() == self.rpc() {
r.bdev()
Expand Down Expand Up @@ -174,7 +193,7 @@ impl NexusBuilder {
children: self.children.as_ref().unwrap().clone(),
nexus_info_key: self.nexus_info_key.as_ref().unwrap().clone(),
resv_type: self.resv_type,
preempt_policy: 0,
preempt_policy: self.preempt_policy,
})
.await
.map(|r| r.into_inner().nexus.unwrap())
Expand Down
6 changes: 3 additions & 3 deletions io-engine-tests/src/nvmf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{net::SocketAddr, path::PathBuf};

use super::{
file_io::{compare_files, test_write_to_file, DataSize},
fio::{run_fio_jobs, Fio},
fio::{spawn_fio_task, Fio},
nexus::{make_nexus_nqn, make_nexus_serial},
nvme::{find_mayastor_nvme_device_path, NmveConnectGuard},
};
Expand Down Expand Up @@ -89,7 +89,7 @@ pub async fn test_fio_to_nvmf(
})
.collect();

run_fio_jobs(&fio).await
spawn_fio_task(&fio).await
}

/// TODO
Expand All @@ -111,5 +111,5 @@ pub async fn test_fio_to_nvmf_aio(
})
.collect();

run_fio_jobs(&fio).await
spawn_fio_task(&fio).await
}
8 changes: 6 additions & 2 deletions io-engine/src/bdev/nexus/nexus_bdev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use crate::{
subsys::NvmfSubsystem,
};

use crate::core::IoCompletionStatus;
use events_api::event::EventAction;
use spdk_rs::{
BdevIo,
Expand Down Expand Up @@ -261,6 +262,8 @@ pub struct Nexus<'n> {
pub(super) nexus_target: Option<NexusTarget>,
/// Indicates if the Nexus has an I/O device.
pub(super) has_io_device: bool,
/// Initiators.
initiators: parking_lot::Mutex<HashSet<String>>,
/// Information associated with the persisted NexusInfo structure.
pub(super) nexus_info: futures::lock::Mutex<PersistentNexusInfo>,
/// Nexus I/O subsystem.
Expand All @@ -271,10 +274,10 @@ pub struct Nexus<'n> {
pub(super) rebuild_history: parking_lot::Mutex<Vec<HistoryRecord>>,
/// Flag to control shutdown from I/O path.
pub(crate) shutdown_requested: AtomicCell<bool>,
/// Last child I/O error.
pub(super) last_error: IoCompletionStatus,
/// Prevent auto-Unpin.
_pin: PhantomPinned,
/// Initiators.
initiators: parking_lot::Mutex<HashSet<String>>,
}

impl<'n> Debug for Nexus<'n> {
Expand Down Expand Up @@ -379,6 +382,7 @@ impl<'n> Nexus<'n> {
event_sink: None,
rebuild_history: parking_lot::Mutex::new(Vec::new()),
shutdown_requested: AtomicCell::new(false),
last_error: IoCompletionStatus::Success,
_pin: Default::default(),
};

Expand Down
Loading

0 comments on commit a7b0f05

Please sign in to comment.