Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(nvmf): selection different CRD for replicas and certain nexus error #1524

Merged
merged 1 commit into from
Oct 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 139 additions & 46 deletions io-engine-tests/src/fio.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,51 @@
use super::file_io::DataSize;
use std::sync::atomic::{AtomicU32, Ordering};
use nix::errno::Errno;
use std::{
path::Path,
sync::atomic::{AtomicU32, Ordering},
time::{Duration, Instant},
};

/// TODO
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum FioJobResult {
NotRun,
Ok,
Error(Errno),
}

/// TODO
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct FioJob {
/// Job counter.
counter: u32,
pub counter: u32,
/// Job name.
name: String,
pub name: String,
/// I/O engine to use. Default: spdk.
ioengine: String,
pub ioengine: String,
/// Filename.
filename: String,
pub filename: String,
/// Type of I/O pattern.
rw: String,
pub rw: String,
/// If true, use non-buffered I/O (usually O_DIRECT). Default: true.
direct: bool,
pub direct: bool,
/// Block size for I/O units. Default: 4k.
blocksize: Option<u32>,
pub blocksize: Option<u32>,
/// Offset in the file to start I/O. Data before the offset will not be
/// touched.
offset: Option<DataSize>,
pub offset: Option<DataSize>,
/// Number of I/O units to keep in flight against the file.
iodepth: Option<u32>,
pub iodepth: Option<u32>,
/// Number of clones (processes/threads performing the same workload) of
/// this job. Default: 1.
numjobs: u32,
pub numjobs: u32,
/// Terminate processing after the specified number of seconds.
runtime: Option<u32>,
pub runtime: Option<u32>,
/// Total size of I/O for this job.
size: Option<DataSize>,
pub size: Option<DataSize>,
/// Run result.
pub result: FioJobResult,
}

impl Default for FioJob {
Expand Down Expand Up @@ -58,6 +73,7 @@ impl FioJob {
numjobs: 1,
runtime: None,
size: None,
result: FioJobResult::NotRun,
}
}

Expand Down Expand Up @@ -100,6 +116,12 @@ impl FioJob {
r
}

/// Sets job name.
pub fn with_name(mut self, v: &str) -> Self {
self.name = v.to_string();
self
}

/// I/O engine to use. Default: spdk.
pub fn with_ioengine(mut self, v: &str) -> Self {
self.ioengine = v.to_string();
Expand All @@ -112,6 +134,12 @@ impl FioJob {
self
}

/// Filename.
pub fn with_filename_path(mut self, v: impl AsRef<Path>) -> Self {
self.filename = v.as_ref().to_str().unwrap().to_string();
self
}

/// Read-write FIO mode.
pub fn with_rw(mut self, rw: &str) -> Self {
self.rw = rw.to_string();
Expand Down Expand Up @@ -170,6 +198,10 @@ pub struct Fio {
pub jobs: Vec<FioJob>,
pub verbose: bool,
pub verbose_err: bool,
pub script: String,
pub total_time: Duration,
pub exit: i32,
pub err_messages: Vec<String>,
}

impl Fio {
Expand Down Expand Up @@ -197,7 +229,7 @@ impl Fio {
self
}

pub fn run(&self) -> std::io::Result<()> {
pub fn run(mut self) -> Self {
let cmd = "sudo -E LD_PRELOAD=$FIO_SPDK fio";

let args = self
Expand All @@ -207,49 +239,110 @@ impl Fio {
.collect::<Vec<_>>()
.join(" ");

let script = format!("{cmd} {args}");
self.script = format!("{cmd} --output-format=json {args}");

if self.verbose {
println!("{script}");
if self.verbose || self.verbose_err {
println!("{}", self.script);
}

let start_time = Instant::now();
let (exit, stdout, stderr) = run_script::run(
&script,
&self.script,
&Vec::new(),
&run_script::ScriptOptions::new(),
)
.unwrap();

if exit == 0 {
if self.verbose {
println!("FIO:");
println!("{script}");
println!("Output:");
println!("{stdout}");
}

Ok(())
} else {
if self.verbose_err {
println!("Error running FIO:");
println!("{script}");
println!("Exit code: {exit}");
println!("Output:");
println!("{stdout}");
println!("Error output:");
println!("{stderr}");
}

Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("SPDK FIO error: {stderr}"),
))
self.total_time = start_time.elapsed();
self.push_err(&stderr);
self.exit = exit;

if let Err(e) = self.update_result(&stdout) {
self.push_err(&e);
}

if self.verbose_err {
println!(
"Error(s) running FIO: {s}",
s = self.err_messages.join("\n")
);
}

self
}

/// TODO
fn push_err(&mut self, msg: &str) {
let s = msg.trim_end_matches('\n');
if !s.is_empty() {
self.err_messages.push(s.to_string());
}
}

/// TODO
fn update_result(&mut self, out: &str) -> Result<(), String> {
// Filter out lines error messages, those starting with "fio: ".
let out = out
.split('\n')
.filter(|s| !s.starts_with("fio: "))
.collect::<Vec<_>>()
.join("\n");

serde_json::from_str::<serde_json::Value>(&out)
.map_err(|e| e.to_string())?
.get("jobs")
.ok_or_else(|| "No 'jobs' item in output".to_string())?
.as_array()
.ok_or_else(|| "'jobs' item in output is not an array".to_string())?
.iter()
.for_each(|j| {
let name =
j.get("jobname").unwrap().as_str().unwrap().to_string();
let err = j.get("error").unwrap().as_i64().unwrap() as i32;

if let Some(j) = self.find_job_mut(&name) {
if err == 0 {
j.result = FioJobResult::Ok;
} else {
j.result = FioJobResult::Error(Errno::from_i32(err));
}
}
});

Ok(())
}

/// TODO
pub fn find_job(&self, name: &str) -> Option<&FioJob> {
self.jobs.iter().find(|j| j.name == name)
}

/// TODO
pub fn find_job_mut(&mut self, name: &str) -> Option<&mut FioJob> {
self.jobs.iter_mut().find(|j| j.name == name)
}
}

/// TODO
pub async fn run_fio_jobs(fio: &Fio) -> std::io::Result<()> {
let fio = fio.clone();
tokio::spawn(async move { fio.run() }).await.unwrap()
/// Spawns a tokio task and runs the given FIO on it. Any FIO error is converted
/// into an `std::io::Result`.
pub async fn spawn_fio_task(fio: &Fio) -> std::io::Result<()> {
let fio = tokio::spawn({
let fio = fio.clone();
async move { fio.run() }
})
.await
.unwrap();

if fio.exit == 0 {
Ok(())
} else {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!(
"SPDK FIO error: {exit} {err_msg}",
exit = fio.exit,
err_msg = fio.err_messages.join("\n")
),
))
}
}
21 changes: 20 additions & 1 deletion io-engine-tests/src/nexus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use super::{
DestroyNexusRequest,
ListNexusOptions,
Nexus,
NexusNvmePreemption,
NvmeReservation,
PublishNexusRequest,
RebuildHistoryRecord,
RebuildHistoryRequest,
Expand Down Expand Up @@ -46,6 +48,7 @@ pub struct NexusBuilder {
resv_key: u64,
preempt_key: u64,
resv_type: Option<i32>,
preempt_policy: i32,
children: Option<Vec<String>>,
nexus_info_key: Option<String>,
serial: Option<String>,
Expand All @@ -63,6 +66,7 @@ impl NexusBuilder {
resv_key: 1,
preempt_key: 0,
resv_type: None,
preempt_policy: 0,
children: None,
nexus_info_key: None,
serial: None,
Expand Down Expand Up @@ -120,6 +124,21 @@ impl NexusBuilder {
self.with_bdev(&r.bdev())
}

pub fn with_resv_key(mut self, r: u64) -> Self {
self.resv_key = r;
self
}

pub fn with_resv_type(mut self, r: NvmeReservation) -> Self {
self.resv_type = Some(r as i32);
self
}

pub fn with_preempt_policy(mut self, r: NexusNvmePreemption) -> Self {
self.preempt_policy = r as i32;
self
}

fn replica_uri(&self, r: &ReplicaBuilder) -> String {
if r.rpc() == self.rpc() {
r.bdev()
Expand Down Expand Up @@ -174,7 +193,7 @@ impl NexusBuilder {
children: self.children.as_ref().unwrap().clone(),
nexus_info_key: self.nexus_info_key.as_ref().unwrap().clone(),
resv_type: self.resv_type,
preempt_policy: 0,
preempt_policy: self.preempt_policy,
})
.await
.map(|r| r.into_inner().nexus.unwrap())
Expand Down
6 changes: 3 additions & 3 deletions io-engine-tests/src/nvmf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{net::SocketAddr, path::PathBuf};

use super::{
file_io::{compare_files, test_write_to_file, DataSize},
fio::{run_fio_jobs, Fio},
fio::{spawn_fio_task, Fio},
nexus::{make_nexus_nqn, make_nexus_serial},
nvme::{find_mayastor_nvme_device_path, NmveConnectGuard},
};
Expand Down Expand Up @@ -89,7 +89,7 @@ pub async fn test_fio_to_nvmf(
})
.collect();

run_fio_jobs(&fio).await
spawn_fio_task(&fio).await
}

/// TODO
Expand All @@ -111,5 +111,5 @@ pub async fn test_fio_to_nvmf_aio(
})
.collect();

run_fio_jobs(&fio).await
spawn_fio_task(&fio).await
}
8 changes: 6 additions & 2 deletions io-engine/src/bdev/nexus/nexus_bdev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use crate::{
subsys::NvmfSubsystem,
};

use crate::core::IoCompletionStatus;
use events_api::event::EventAction;
use spdk_rs::{
BdevIo,
Expand Down Expand Up @@ -261,6 +262,8 @@ pub struct Nexus<'n> {
pub(super) nexus_target: Option<NexusTarget>,
/// Indicates if the Nexus has an I/O device.
pub(super) has_io_device: bool,
/// Initiators.
initiators: parking_lot::Mutex<HashSet<String>>,
/// Information associated with the persisted NexusInfo structure.
pub(super) nexus_info: futures::lock::Mutex<PersistentNexusInfo>,
/// Nexus I/O subsystem.
Expand All @@ -271,10 +274,10 @@ pub struct Nexus<'n> {
pub(super) rebuild_history: parking_lot::Mutex<Vec<HistoryRecord>>,
/// Flag to control shutdown from I/O path.
pub(crate) shutdown_requested: AtomicCell<bool>,
/// Last child I/O error.
pub(super) last_error: IoCompletionStatus,
/// Prevent auto-Unpin.
_pin: PhantomPinned,
/// Initiators.
initiators: parking_lot::Mutex<HashSet<String>>,
}

impl<'n> Debug for Nexus<'n> {
Expand Down Expand Up @@ -379,6 +382,7 @@ impl<'n> Nexus<'n> {
event_sink: None,
rebuild_history: parking_lot::Mutex::new(Vec::new()),
shutdown_requested: AtomicCell::new(false),
last_error: IoCompletionStatus::Success,
_pin: Default::default(),
};

Expand Down
Loading