diff --git a/io-engine/src/bdev/nexus/nexus_bdev.rs b/io-engine/src/bdev/nexus/nexus_bdev.rs index abe8cf077b..bf5b97186e 100644 --- a/io-engine/src/bdev/nexus/nexus_bdev.rs +++ b/io-engine/src/bdev/nexus/nexus_bdev.rs @@ -591,6 +591,7 @@ impl<'n> Nexus<'n> { let (sender, recv) = oneshot::channel::(); self.traverse_io_channels( + sender, |chan, _sender| -> ChannelTraverseStatus { chan.reconnect_all(); ChannelTraverseStatus::Ok @@ -599,7 +600,6 @@ impl<'n> Nexus<'n> { debug!("{self:?}: all I/O channels reconfigured"); sender.send(status).expect("reconfigure channel gone"); }, - sender, ); let result = recv.await.expect("reconfigure sender already dropped"); diff --git a/io-engine/src/bdev/nexus/nexus_bdev_children.rs b/io-engine/src/bdev/nexus/nexus_bdev_children.rs index c016382b67..4737142514 100644 --- a/io-engine/src/bdev/nexus/nexus_bdev_children.rs +++ b/io-engine/src/bdev/nexus/nexus_bdev_children.rs @@ -838,12 +838,12 @@ impl<'n> Nexus<'n> { // and the I/O would end up logged. if has_io_log { self.traverse_io_channels( + (), |chan, _| { chan.reconnect_io_logs(); ChannelTraverseStatus::Ok }, |_, _| {}, - (), ); c.io_log_channel() @@ -1048,9 +1048,9 @@ impl<'n> Nexus<'n> { ); self.traverse_io_channels( + ctx, update_failfast_cb, update_failfast_done, - ctx, ); r.await diff --git a/io-engine/src/bdev/nvmx/channel.rs b/io-engine/src/bdev/nvmx/channel.rs index a68096e159..0f71e986d5 100644 --- a/io-engine/src/bdev/nvmx/channel.rs +++ b/io-engine/src/bdev/nvmx/channel.rs @@ -1,30 +1,13 @@ /* I/O channel for NVMe controller, one per core. */ -use std::{ - cmp::max, - mem::size_of, - os::raw::c_void, - ptr::NonNull, - time::Duration, -}; +use std::{mem::size_of, os::raw::c_void, ptr::NonNull, time::Duration}; use spdk_rs::{ libspdk::{ nvme_qpair_abort_all_queued_reqs, nvme_transport_qpair_abort_reqs, spdk_io_channel, - spdk_nvme_ctrlr_alloc_io_qpair, - spdk_nvme_ctrlr_connect_io_qpair, - spdk_nvme_ctrlr_disconnect_io_qpair, - spdk_nvme_ctrlr_free_io_qpair, - spdk_nvme_ctrlr_get_default_io_qpair_opts, - spdk_nvme_io_qpair_opts, - spdk_nvme_poll_group, - spdk_nvme_poll_group_add, - spdk_nvme_poll_group_create, - spdk_nvme_poll_group_destroy, spdk_nvme_poll_group_process_completions, - spdk_nvme_poll_group_remove, spdk_nvme_qpair, spdk_put_io_channel, }, @@ -32,30 +15,21 @@ use spdk_rs::{ PollerBuilder, }; -#[cfg(feature = "spdk-async-qpair-connect")] -use spdk_rs::libspdk::{ - spdk_nvme_ctrlr_connect_io_qpair_async, - spdk_nvme_ctrlr_io_qpair_connect_poll_async, -}; - -#[cfg(feature = "spdk-async-qpair-connect")] -use nix::errno::Errno; - use crate::{ - bdev::{ - device_lookup, - nvmx::{ - controller_inner::SpdkNvmeController, - nvme_bdev_running_config, - NvmeControllerState, - NVME_CONTROLLERS, - }, - }, - core::{BlockDevice, BlockDeviceIoStats, CoreError, IoType}, + bdev::device_lookup, + core::{BlockDevice, BlockDeviceIoStats, IoType}, }; -use futures::channel::oneshot; +use super::{ + nvme_bdev_running_config, + NvmeControllerState, + PollGroup, + QPair, + SpdkNvmeController, + NVME_CONTROLLERS, +}; +/// TODO #[repr(C)] pub struct NvmeIoChannel<'a> { inner: *mut NvmeIoChannelInner<'a>, @@ -87,396 +61,6 @@ impl<'a> NvmeIoChannel<'a> { } } -#[derive(Debug, Serialize, Clone, Copy, PartialEq, PartialOrd)] -pub enum QPairState { - Disconnected, - Disconnecting, - Connecting, - Connected, - Enabling, - Enabled, - Destroying, -} - -impl From for QPairState { - fn from(u: u8) -> Self { - match u { - 0 => Self::Disconnected, - 1 => Self::Disconnecting, - 2 => Self::Connecting, - 3 => Self::Connected, - 4 => Self::Enabling, - 5 => Self::Enabled, - 6 => Self::Destroying, - _ => panic!("qpair in a unknown state"), - } - } -} - -impl ToString for QPairState { - fn to_string(&self) -> String { - match *self { - QPairState::Disconnected => "Disconnected", - QPairState::Disconnecting => "Disconnecting", - QPairState::Connecting => "Connecting", - QPairState::Connected => "Connected", - QPairState::Enabling => "Enabling", - QPairState::Enabled => "Enabled", - QPairState::Destroying => "Destroying", - } - .parse() - .unwrap() - } -} - -#[derive(Debug)] -pub struct IoQpair { - /// TODO - qpair: NonNull, - /// TODO - ctrlr_handle: SpdkNvmeController, - /// TODO - state: QPairState, - /// Connection waiters for async qpair connection support. - #[allow(dead_code)] - connect_waiters: Vec>>, -} - -impl IoQpair { - fn get_default_options( - ctrlr_handle: SpdkNvmeController, - ) -> spdk_nvme_io_qpair_opts { - let mut opts = spdk_nvme_io_qpair_opts::default(); - let default_opts = nvme_bdev_running_config(); - - unsafe { - spdk_nvme_ctrlr_get_default_io_qpair_opts( - ctrlr_handle.as_ptr(), - &mut opts, - size_of::() as u64, - ) - }; - - opts.io_queue_requests = - max(opts.io_queue_requests, default_opts.io_queue_requests); - opts.create_only = true; - - // Always assume async_mode is enabled instread of - // relying on default_opts.async_mode. - opts.async_mode = true; - - opts - } - - /// Create a qpair with default options for target NVMe controller. - fn create( - ctrlr_handle: SpdkNvmeController, - ctrlr_name: &str, - ) -> Result { - //assert!(!ctrlr_handle.is_null(), "controller handle is null"); - - let qpair_opts = IoQpair::get_default_options(ctrlr_handle); - - let qpair: *mut spdk_nvme_qpair = unsafe { - spdk_nvme_ctrlr_alloc_io_qpair( - ctrlr_handle.as_ptr(), - &qpair_opts, - size_of::() as u64, - ) - }; - - if let Some(q) = NonNull::new(qpair) { - trace!(?qpair, ?ctrlr_name, "qpair created for controller"); - Ok(Self { - qpair: q, - ctrlr_handle, - state: QPairState::Disconnected, - connect_waiters: Vec::new(), - }) - } else { - error!(?ctrlr_name, "Failed to allocate I/O qpair for controller",); - Err(CoreError::GetIoChannel { - name: ctrlr_name.to_string(), - }) - } - } - - /// Get SPDK qpair object. - pub fn as_ptr(&self) -> *mut spdk_nvme_qpair { - self.qpair.as_ptr() - } - - /// Synchronously connect qpair. - pub(crate) fn connect(&mut self) -> i32 { - trace!(?self, "connecting I/O qpair"); - - // Check if I/O qpair is already connected to provide idempotency for - // multiple allocations of the same handle for the same thread, to make - // sure we don't reconnect every time. - if self.state == QPairState::Connected { - return 0; - } - - // During synchronous connection we shouldn't be preemped by any other - // SPDK thread, so we can't see QPairState::Connecting. - assert_eq!( - self.state, - QPairState::Disconnected, - "Insufficient QPair state" - ); - - // Mark qpair as being connected and try to connect. - let status = unsafe { - spdk_nvme_ctrlr_connect_io_qpair( - self.ctrlr_handle.as_ptr(), - self.qpair.as_ptr(), - ) - }; - - // Update QPairState according to the connection result. - self.state = if status == 0 { - QPairState::Connected - } else { - QPairState::Disconnected - }; - - trace!(?self, ?status, state=?self.state,"I/O qpair connected"); - - status - } - - #[cfg(feature = "spdk-async-qpair-connect")] - /// Asynchronously connect qpair. - pub(crate) async fn connect_async(&mut self) -> Result<(), CoreError> { - // Check if I/O qpair is already connected to provide idempotency for - // multiple allocations of the same handle for the same thread, to make - // sure we don't reconnect every time. - if self.state == QPairState::Connected { - return Ok(()); - } - - // Take into account other connect requests for this I/O qpair to avoid - // multiple concurrent connections. - match self.state { - QPairState::Disconnected => { - self.state = QPairState::Connecting; - } - QPairState::Connecting => { - let (sender, receiver) = - oneshot::channel::>(); - - self.connect_waiters.push(sender); - - let r = receiver - .await - .expect("I/O qpair connection sender disappeared"); - return r; - } - _ => { - panic!("QPair is in insufficient state: {:?}", self.state); - } - } - - let (sender, receiver) = - oneshot::channel::>(); - - let connect_arg = Box::into_raw(Box::new(IoQpairConnectContext { - sender: Some(sender), - poller: None, - })); - - let connect_ctx = unsafe { - spdk_nvme_ctrlr_connect_io_qpair_async( - self.ctrlr_handle.as_ptr(), - self.qpair.as_ptr(), - Some(qpair_connect_cb), - connect_arg as *mut c_void, - ) - }; - - if connect_ctx.is_null() { - error!(qpair=?self, "Failed to initiate asynchronous connection on a qpair"); - return Err(CoreError::OpenBdev { - source: Errno::ENXIO, - }); - } - - let qpair = self.qpair.as_ptr(); - - let poller = PollerBuilder::new() - .with_name("io_qpair_connect_poller") - .with_interval(Duration::from_millis(1)) - .with_data(()) - .with_poll_fn(move |_| unsafe { - let ctx = &mut *connect_arg; - - let st = spdk_nvme_ctrlr_io_qpair_connect_poll_async( - qpair, - connect_ctx, - ); - - match st { - // Connection complete, callback is called. - 0 => 1, - // Connection still in progress, keep polling. - 1 => 0, - // Error occured during polling. - errno => { - error!(?qpair, ?errno, "I/O qpair connection failed"); - - // Stop the poller and notify the listener. - ctx.poller.take(); - ctx.sender - .take() - .expect("No qpair connection sender provided") - .send(Err(Errno::from_i32(errno))) - .expect("Failed to notify I/O qpair connection listener"); - 1 - } - } - }) - .build(); - - unsafe { - (*connect_arg).poller = Some(poller); - } - - let r = receiver - .await - .expect("I/O qpair connection sender disappeared") - .map_err(|e| CoreError::OpenBdev { - source: e, - }); - - // Update QPairState according to the connection result. - self.state = if r.is_ok() { - QPairState::Connected - } else { - QPairState::Disconnected - }; - - // Wake up all other callers waiting for connection to complete. - if !self.connect_waiters.is_empty() { - let waiters: Vec>> = - self.connect_waiters.drain(..).collect(); - trace!( - ?qpair, - waiters = waiters.len(), - "Notifying connection waiters" - ); - for w in waiters { - w.send(r.clone()) - .expect("Failed to notify a connection waiter"); - } - trace!(?qpair, "All connection waiters are notified"); - } - - // Drop context object transformed previously into a raw pointer. - unsafe { - drop(Box::from_raw(connect_arg)); - } - - r - } -} - -#[cfg(feature = "spdk-async-qpair-connect")] -struct IoQpairConnectContext<'poller> { - sender: Option>>, - poller: Option>, -} - -#[cfg(feature = "spdk-async-qpair-connect")] -extern "C" fn qpair_connect_cb( - qpair: *mut spdk_nvme_qpair, - cb_ctx: *mut c_void, -) { - trace!(?qpair, "I/O qpair successfully connected"); - - let connect_ctx = - unsafe { &mut *(cb_ctx as *const _ as *mut IoQpairConnectContext) }; - - // Stop the poller. - connect_ctx.poller.take(); - - // Notify the listener. - connect_ctx - .sender - .take() - .expect("No qpair connection sender provided") - .send(Ok(())) - .expect("Failed to notify I/O qpair connection listener"); -} - -struct PollGroup(NonNull); - -impl PollGroup { - /// Create a poll group. - fn create(ctx: *mut c_void, ctrlr_name: &str) -> Result { - let poll_group: *mut spdk_nvme_poll_group = - unsafe { spdk_nvme_poll_group_create(ctx, std::ptr::null_mut()) }; - - if poll_group.is_null() { - Err(CoreError::GetIoChannel { - name: ctrlr_name.to_string(), - }) - } else { - Ok(Self(NonNull::new(poll_group).unwrap())) - } - } - - /// Add I/O qpair to poll group. - fn add_qpair(&mut self, qpair: &IoQpair) -> i32 { - unsafe { spdk_nvme_poll_group_add(self.0.as_ptr(), qpair.as_ptr()) } - } - - /// Remove I/O qpair to poll group. - fn remove_qpair(&mut self, qpair: &IoQpair) -> i32 { - unsafe { spdk_nvme_poll_group_remove(self.0.as_ptr(), qpair.as_ptr()) } - } - - /// Get SPDK handle for poll group. - #[inline] - fn as_ptr(&self) -> *mut spdk_nvme_poll_group { - self.0.as_ptr() - } -} - -impl Drop for PollGroup { - fn drop(&mut self) { - trace!("dropping poll group {:p}", self.0.as_ptr()); - let rc = unsafe { spdk_nvme_poll_group_destroy(self.0.as_ptr()) }; - if rc < 0 { - error!("Error on poll group destroy: {}", rc); - } - trace!("poll group {:p} successfully dropped", self.0.as_ptr()); - } -} - -/// spdk_nvme_ctrlr_free_io_qpair() calls disconnected. So we can either -/// a. NOT call disconnect here -/// and have SPDK disconnect it. -/// b. set the ptr to null, as SPDK checks if the ptr is NULL. However, that -/// breaks the contract with NonNull -impl Drop for IoQpair { - fn drop(&mut self) { - let qpair = self.qpair.as_ptr(); - - unsafe { - nvme_qpair_abort_all_queued_reqs(qpair, 1); - trace!(?qpair, "I/O requests successfully aborted,"); - nvme_transport_qpair_abort_reqs(qpair, 1); - trace!(?qpair, "transport requests successfully aborted,"); - spdk_nvme_ctrlr_disconnect_io_qpair(qpair); - trace!(?qpair, "qpair successfully disconnected,"); - spdk_nvme_ctrlr_free_io_qpair(qpair); - trace!(?qpair, "qpair successfully freed,"); - } - - trace!(?qpair, "qpair successfully dropped,"); - } -} - impl std::fmt::Debug for NvmeIoChannelInner<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("NvmeIoChannelInner") @@ -487,7 +71,7 @@ impl std::fmt::Debug for NvmeIoChannelInner<'_> { } pub struct NvmeIoChannelInner<'a> { - pub qpair: Option, + qpair: Option, poll_group: PollGroup, poller: Poller<'a>, io_stats_controller: IoStatsController, @@ -513,17 +97,48 @@ pub struct NvmeIoChannelInner<'a> { } impl NvmeIoChannelInner<'_> { + /// Returns SPDK pointer for the QPair. The QPair must exist. + #[inline(always)] + pub(crate) unsafe fn qpair_ptr(&mut self) -> *mut spdk_nvme_qpair { + self.qpair.as_mut().expect("QPair must exist").as_ptr() + } + + #[inline(always)] + pub(crate) fn qpair(&self) -> &Option { + &self.qpair + } + + #[inline(always)] + pub(crate) fn qpair_mut(&mut self) -> &mut Option { + &mut self.qpair + } + + fn remove_qpair(&mut self) -> Option { + if let Some(q) = &self.qpair { + trace!(qpair = ?q.as_ptr(), "removing qpair"); + } + self.qpair.take() + } + /// Reset channel, making it unusable till reinitialize() is called. pub fn reset(&mut self) -> i32 { - if self.qpair.is_some() { - // Remove qpair and trigger its deallocation via drop(). - let qpair = self.qpair.take().unwrap(); - trace!( - "dropping qpair {:p} ({}) I/O requests pending)", - qpair.as_ptr(), - self.num_pending_ios - ); + // Remove qpair and trigger its deallocation via drop(). + match self.remove_qpair() { + Some(qpair) => { + trace!( + "reset: dropping qpair {:p} ({}) I/O requests pending)", + qpair.as_ptr(), + self.num_pending_ios + ); + } + None => { + trace!( + "reset: no qpair ({}) I/O requests pending)", + self.num_pending_ios + ); + } } + 0 } @@ -578,8 +193,7 @@ impl NvmeIoChannelInner<'_> { // We assume that channel is reinitialized after being reset, so we // expect to see no I/O qpair. - let prev = self.qpair.take(); - if prev.is_some() { + if self.remove_qpair().is_some() { warn!( ?ctrlr_name, "I/O channel has active I/O qpair while being reinitialized, clearing" @@ -587,7 +201,7 @@ impl NvmeIoChannelInner<'_> { } // Create qpair for target controller. - let mut qpair = match IoQpair::create(ctrlr_handle, ctrlr_name) { + let qpair = match QPair::create(ctrlr_handle, ctrlr_name) { Ok(qpair) => qpair, Err(e) => { error!(?ctrlr_name, ?e, "Failed to allocate qpair,"); @@ -689,7 +303,7 @@ extern "C" fn disconnected_qpair_cb( ) { let inner = NvmeIoChannel::from_raw(ctx).inner_mut(); - if let Some(ref qpair) = inner.qpair { + if let Some(qpair) = inner.qpair() { unsafe { nvme_qpair_abort_all_queued_reqs(qpair.as_ptr(), 1); nvme_transport_qpair_abort_reqs(qpair.as_ptr(), 1); @@ -776,7 +390,7 @@ impl NvmeControllerIoChannel { }; // Allocate qpair. - let qpair = match IoQpair::create(controller, &cname) { + let qpair = match QPair::create(controller, &cname) { Ok(qpair) => qpair, Err(e) => { error!(?cname, ?e, "Failed to allocate qpair"); @@ -837,12 +451,14 @@ impl NvmeControllerIoChannel { let ch = NvmeIoChannel::from_raw(ctx); let mut inner = unsafe { Box::from_raw(ch.inner) }; + let qpair = inner.remove_qpair(); + // Stop the poller and do extra handling for I/O qpair, as it needs // to be detached from the poller prior poller // destruction. inner.poller.stop(); - if let Some(qpair) = inner.qpair.take() { + if let Some(qpair) = qpair { inner.poll_group.remove_qpair(&qpair); } } diff --git a/io-engine/src/bdev/nvmx/handle.rs b/io-engine/src/bdev/nvmx/handle.rs index 12a6a72ee3..1194ce1f76 100644 --- a/io-engine/src/bdev/nvmx/handle.rs +++ b/io-engine/src/bdev/nvmx/handle.rs @@ -185,7 +185,7 @@ impl NvmeDeviceHandle { fn connect_sync(&mut self) { let inner = NvmeIoChannel::inner_from_channel(self.io_channel.as_ptr()); - match inner.qpair.as_mut() { + match inner.qpair_mut() { Some(q) => { q.connect(); } @@ -198,7 +198,7 @@ impl NvmeDeviceHandle { pub(crate) async fn connect_async(&mut self) -> Result<(), CoreError> { let inner = NvmeIoChannel::inner_from_channel(self.io_channel.as_ptr()); - match inner.qpair.as_mut() { + match inner.qpair_mut() { Some(q) => q.connect_async().await, None => { error!("No I/O qpair in NvmeDeviceHandle, can't connect()"); @@ -489,7 +489,7 @@ fn check_channel_for_io( // Check against concurrent controller reset, which results in valid // I/O channel but deactivated I/O pair. - if inner.qpair.is_none() { + if inner.qpair().is_none() { errno = libc::ENODEV; } @@ -566,7 +566,7 @@ impl BlockDeviceHandle for NvmeDeviceHandle { let rc = unsafe { spdk_nvme_ns_cmd_read( self.ns.as_ptr(), - inner.qpair.as_mut().unwrap().as_ptr(), + inner.qpair_ptr(), **buffer, offset_blocks, num_blocks as u32, @@ -656,7 +656,7 @@ impl BlockDeviceHandle for NvmeDeviceHandle { let rc = unsafe { spdk_nvme_ns_cmd_write( self.ns.as_ptr(), - inner.qpair.as_mut().unwrap().as_ptr(), + inner.qpair_ptr(), **buffer, offset_blocks, num_blocks as u32, @@ -734,7 +734,7 @@ impl BlockDeviceHandle for NvmeDeviceHandle { unsafe { spdk_nvme_ns_cmd_read( self.ns.as_ptr(), - inner.qpair.as_mut().unwrap().as_ptr(), + inner.qpair_ptr(), (*iov).iov_base, offset_blocks, num_blocks as u32, @@ -747,7 +747,7 @@ impl BlockDeviceHandle for NvmeDeviceHandle { unsafe { spdk_nvme_ns_cmd_readv( self.ns.as_ptr(), - inner.qpair.as_mut().unwrap().as_ptr(), + inner.qpair_ptr(), offset_blocks, num_blocks as u32, Some(nvme_io_done), @@ -809,7 +809,7 @@ impl BlockDeviceHandle for NvmeDeviceHandle { unsafe { spdk_nvme_ns_cmd_write( self.ns.as_ptr(), - inner.qpair.as_mut().unwrap().as_ptr(), + inner.qpair_ptr(), (*iov).iov_base, offset_blocks, num_blocks as u32, @@ -822,7 +822,7 @@ impl BlockDeviceHandle for NvmeDeviceHandle { unsafe { spdk_nvme_ns_cmd_writev( self.ns.as_ptr(), - inner.qpair.as_mut().unwrap().as_ptr(), + inner.qpair_ptr(), offset_blocks, num_blocks as u32, Some(nvme_writev_done), @@ -905,7 +905,7 @@ impl BlockDeviceHandle for NvmeDeviceHandle { let rc = unsafe { spdk_nvme_ns_cmd_flush( self.ns.as_ptr(), - inner.qpair.as_mut().unwrap().as_ptr(), + inner.qpair_ptr(), Some(nvme_flush_completion), bio as *mut c_void, ) @@ -1003,7 +1003,7 @@ impl BlockDeviceHandle for NvmeDeviceHandle { spdk_nvme_ns_cmd_dataset_management( self.ns.as_ptr(), - inner.qpair.as_mut().unwrap().as_ptr(), + inner.qpair_ptr(), utils::NvmeDsmAttribute::Deallocate as u32, dsm_ranges, num_ranges as u16, @@ -1062,7 +1062,7 @@ impl BlockDeviceHandle for NvmeDeviceHandle { let rc = unsafe { spdk_nvme_ns_cmd_write_zeroes( self.ns.as_ptr(), - inner.qpair.as_mut().unwrap().as_ptr(), + inner.qpair_ptr(), offset_blocks, num_blocks as u32, Some(nvme_io_done), @@ -1126,7 +1126,7 @@ impl BlockDeviceHandle for NvmeDeviceHandle { let inner = NvmeIoChannel::inner_from_channel(self.io_channel.as_ptr()); // Make sure channel allows I/O. - if inner.qpair.is_none() { + if inner.qpair().is_none() { return Err(CoreError::NvmeAdminDispatch { source: Errno::ENODEV, opcode: cmd.opc(), @@ -1309,7 +1309,7 @@ impl BlockDeviceHandle for NvmeDeviceHandle { let inner = NvmeIoChannel::inner_from_channel(self.io_channel.as_ptr()); // Make sure channel allows I/O. - if inner.qpair.is_none() { + if inner.qpair().is_none() { return Err(CoreError::NvmeIoPassthruDispatch { source: Errno::ENODEV, opcode: nvme_cmd.opc(), @@ -1326,7 +1326,7 @@ impl BlockDeviceHandle for NvmeDeviceHandle { unsafe { spdk_nvme_ctrlr_cmd_io_raw( self.ctrlr.as_ptr(), - inner.qpair.as_mut().unwrap().as_ptr(), + inner.qpair_ptr(), &mut pcmd, ptr, size as u32, diff --git a/io-engine/src/bdev/nvmx/mod.rs b/io-engine/src/bdev/nvmx/mod.rs index b7ed11ff19..84dbe5deda 100644 --- a/io-engine/src/bdev/nvmx/mod.rs +++ b/io-engine/src/bdev/nvmx/mod.rs @@ -5,10 +5,13 @@ use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; pub use channel::{NvmeControllerIoChannel, NvmeIoChannel, NvmeIoChannelInner}; pub use controller::NvmeController; +use controller_inner::SpdkNvmeController; pub use controller_state::NvmeControllerState; pub use device::{lookup_by_name, open_by_name, NvmeBlockDevice}; pub use handle::{nvme_io_ctx_pool_init, NvmeDeviceHandle}; pub use namespace::NvmeNamespace; +use poll_group::PollGroup; +pub use qpair::{QPair, QPairState}; pub use snapshot::{NvmeSnapshotMessage, NvmeSnapshotMessageV1}; pub(crate) use uri::NvmfDeviceTemplate; @@ -24,6 +27,8 @@ mod controller_state; mod device; mod handle; mod namespace; +mod poll_group; +mod qpair; mod snapshot; mod uri; pub mod utils; diff --git a/io-engine/src/bdev/nvmx/poll_group.rs b/io-engine/src/bdev/nvmx/poll_group.rs new file mode 100644 index 0000000000..a032006807 --- /dev/null +++ b/io-engine/src/bdev/nvmx/poll_group.rs @@ -0,0 +1,62 @@ +use std::{os::raw::c_void, ptr::NonNull}; + +use spdk_rs::libspdk::{ + spdk_nvme_poll_group, + spdk_nvme_poll_group_add, + spdk_nvme_poll_group_create, + spdk_nvme_poll_group_destroy, + spdk_nvme_poll_group_remove, +}; + +use crate::core::CoreError; + +use super::QPair; + +/// Wrapper for NVMe SPDK poll group structure. +pub(super) struct PollGroup(NonNull); + +impl PollGroup { + /// Creates a poll group. + pub(super) fn create( + ctx: *mut c_void, + ctrlr_name: &str, + ) -> Result { + let poll_group: *mut spdk_nvme_poll_group = + unsafe { spdk_nvme_poll_group_create(ctx, std::ptr::null_mut()) }; + + if poll_group.is_null() { + Err(CoreError::GetIoChannel { + name: ctrlr_name.to_string(), + }) + } else { + Ok(Self(NonNull::new(poll_group).unwrap())) + } + } + + /// Adds I/O qpair to poll group. + pub(super) fn add_qpair(&mut self, qpair: &QPair) -> i32 { + unsafe { spdk_nvme_poll_group_add(self.0.as_ptr(), qpair.as_ptr()) } + } + + /// Removes I/O qpair to poll group. + pub(super) fn remove_qpair(&mut self, qpair: &QPair) -> i32 { + unsafe { spdk_nvme_poll_group_remove(self.0.as_ptr(), qpair.as_ptr()) } + } + + /// Gets SPDK handle for poll group. + #[inline(always)] + pub(super) fn as_ptr(&self) -> *mut spdk_nvme_poll_group { + self.0.as_ptr() + } +} + +impl Drop for PollGroup { + fn drop(&mut self) { + trace!("dropping poll group {:p}", self.0.as_ptr()); + let rc = unsafe { spdk_nvme_poll_group_destroy(self.0.as_ptr()) }; + if rc < 0 { + error!("Error on poll group destroy: {}", rc); + } + trace!("poll group {:p} successfully dropped", self.0.as_ptr()); + } +} diff --git a/io-engine/src/bdev/nvmx/qpair.rs b/io-engine/src/bdev/nvmx/qpair.rs new file mode 100644 index 0000000000..81f517f433 --- /dev/null +++ b/io-engine/src/bdev/nvmx/qpair.rs @@ -0,0 +1,550 @@ +use std::{ + cell::RefCell, + cmp::max, + fmt::{Debug, Formatter}, + mem::size_of, + rc::Rc, +}; + +use futures::channel::oneshot; + +use spdk_rs::libspdk::{ + nvme_qpair_abort_all_queued_reqs, + nvme_transport_qpair_abort_reqs, + spdk_nvme_ctrlr, + spdk_nvme_ctrlr_alloc_io_qpair, + spdk_nvme_ctrlr_connect_io_qpair, + spdk_nvme_ctrlr_disconnect_io_qpair, + spdk_nvme_ctrlr_free_io_qpair, + spdk_nvme_ctrlr_get_default_io_qpair_opts, + spdk_nvme_io_qpair_opts, + spdk_nvme_qpair, +}; + +#[cfg(feature = "spdk-async-qpair-connect")] +use std::{os::raw::c_void, time::Duration}; + +#[cfg(feature = "spdk-async-qpair-connect")] +use spdk_rs::{ + libspdk::{ + spdk_nvme_ctrlr_connect_io_qpair_async, + spdk_nvme_ctrlr_io_qpair_connect_poll_async, + spdk_nvme_io_qpair_connect_ctx, + }, + Poller, + PollerBuilder, + UnsafeRef, +}; + +#[cfg(feature = "spdk-async-qpair-connect")] +use nix::errno::Errno; + +use crate::core::CoreError; + +use super::{nvme_bdev_running_config, SpdkNvmeController}; + +/// I/O QPair state. +#[derive(Debug, Serialize, Clone, Copy, PartialEq, PartialOrd)] +pub enum QPairState { + /// QPair is not connected. + Disconnected, + /// QPair is connecting asynchronously. + #[cfg(feature = "spdk-async-qpair-connect")] + Connecting, + /// QPair is connected. + Connected, + /// QPair is dropped. + Dropped, +} + +impl ToString for QPairState { + fn to_string(&self) -> String { + match self { + QPairState::Disconnected => "Disconnected", + #[cfg(feature = "spdk-async-qpair-connect")] + QPairState::Connecting => "Connecting", + QPairState::Connected => "Connected", + QPairState::Dropped => "Dropped", + } + .to_string() + } +} + +/// I/O QPair. +pub struct QPair { + inner: Rc>, +} + +impl Debug for QPair { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + self.inner.borrow().fmt(f) + } +} + +impl Drop for QPair { + fn drop(&mut self) { + unsafe { + let qpair = self.as_ptr(); + nvme_qpair_abort_all_queued_reqs(qpair, 1); + nvme_transport_qpair_abort_reqs(qpair, 1); + spdk_nvme_ctrlr_disconnect_io_qpair(qpair); + spdk_nvme_ctrlr_free_io_qpair(qpair); + } + + trace!(?self, "I/O qpair disconnected"); + + { + // Zero-out pointers to the destroyed objects and set `Dropped` + // state. + let mut inner = self.inner.borrow_mut(); + inner.qpair = std::ptr::null_mut(); + inner.ctrlr = std::ptr::null_mut(); + inner.state = QPairState::Dropped; + } + } +} + +/// Returns default qpair options. +fn get_default_options( + ctrlr_handle: SpdkNvmeController, +) -> spdk_nvme_io_qpair_opts { + let mut opts = spdk_nvme_io_qpair_opts::default(); + let default_opts = nvme_bdev_running_config(); + + unsafe { + spdk_nvme_ctrlr_get_default_io_qpair_opts( + ctrlr_handle.as_ptr(), + &mut opts, + size_of::() as u64, + ) + }; + + opts.io_queue_requests = + max(opts.io_queue_requests, default_opts.io_queue_requests); + opts.create_only = true; + + // Always assume async_mode is enabled instread of + // relying on default_opts.async_mode. + opts.async_mode = true; + + opts +} + +impl QPair { + /// Returns an SPDK qpair object pointer. + #[inline(always)] + pub fn as_ptr(&self) -> *mut spdk_nvme_qpair { + self.inner.borrow().qpair + } + + /// Returns QPair state. + #[inline(always)] + pub fn state(&self) -> QPairState { + self.inner.borrow().state + } + + /// Sets QPair state. + #[inline(always)] + fn set_state(&self, state: QPairState) { + self.inner.borrow_mut().state = state; + } + + /// Creates a qpair with default options for target NVMe controller. + pub(super) fn create( + ctrlr_handle: SpdkNvmeController, + ctrlr_name: &str, + ) -> Result { + let qpair_opts = get_default_options(ctrlr_handle); + let ctrlr = ctrlr_handle.as_ptr(); + + let qpair = unsafe { + spdk_nvme_ctrlr_alloc_io_qpair( + ctrlr, + &qpair_opts, + size_of::() as u64, + ) + }; + + if qpair.is_null() { + error!( + ?ctrlr, + ?ctrlr_name, + "failed to allocate I/O qpair for controller" + ); + + Err(CoreError::GetIoChannel { + name: ctrlr_name.to_string(), + }) + } else { + let qpair = Self { + inner: Rc::new(RefCell::new(Inner { + qpair, + ctrlr, + ctrlr_name: ctrlr_name.to_owned(), + state: QPairState::Disconnected, + waiters: Vec::new(), + })), + }; + + trace!(?qpair, "I/O qpair created for controller"); + + Ok(qpair) + } + } + + /// Connects a qpair synchronously. + pub(crate) fn connect(&self) -> i32 { + trace!(?self, "new I/O qpair connection"); + + // Check if I/O qpair is already connected to provide idempotency for + // multiple allocations of the same handle for the same thread, to make + // sure we don't reconnect every time. + if self.state() == QPairState::Connected { + trace!(?self, "I/O qpair already connected"); + return 0; + } + + // During synchronous connection we shouldn't be preemped by any other + // SPDK thread, so we can't see QPairState::Connecting. + assert_eq!( + self.state(), + QPairState::Disconnected, + "Invalid QPair state" + ); + + // Mark qpair as being connected and try to connect. + let status = self.inner.borrow().ctrlr_connect_sync(); + + // Update QPairState according to the connection result. + self.set_state(if status == 0 { + QPairState::Connected + } else { + QPairState::Disconnected + }); + + trace!(?self, ?status, "I/O qpair connected"); + + status + } +} + +#[cfg(feature = "spdk-async-qpair-connect")] +impl QPair { + /// Connects a qpair asynchronously. + pub(crate) async fn connect_async(&self) -> Result<(), CoreError> { + // Take into account other connect requests for this I/O qpair to avoid + // multiple concurrent connections. + let recv = match self.state() { + QPairState::Disconnected => self.start_new_async()?, + QPairState::Connecting => self.waiting_async()?, + QPairState::Connected => { + // Provide idempotency for multiple allocations of the same + // handle for the same thread, to make sure we don't reconnect + // every time. + trace!(?self, "I/O qpair already connected"); + return Ok(()); + } + QPairState::Dropped => { + panic!("I/O qpair is in an invalid state: {:?}", self.state()); + } + }; + + let qpair = self.as_ptr(); + let res = recv.await.map_err(|_| { + // Receiver failure may be caused by qpair having been dropped, + // so we cannot use `self` here. + error!(?qpair, "I/O qpair connection canceled"); + CoreError::OpenBdev { + source: Errno::ECANCELED, + } + })?; + + res + } + + /// Starts a new async connection and returns a receiver for it. + fn start_new_async(&self) -> Result { + trace!(?self, "new async I/O pair connection"); + + assert_eq!(self.state(), QPairState::Disconnected); + + self.set_state(QPairState::Connecting); + Connection::create(self.inner.clone()) + } + + /// Returns a receiver to wait for a connection already in progress. + fn waiting_async(&self) -> Result { + trace!( + ?self, + "new async I/O pair connection: connection already in progress" + ); + + assert_eq!(self.state(), QPairState::Connecting); + + let (s, r) = oneshot::channel(); + self.inner.borrow_mut().waiters.push(s); + Ok(r) + } +} + +type ConnectResult = Result<(), CoreError>; + +type ResultSender = oneshot::Sender; + +#[allow(dead_code)] +type ResultReceiver = oneshot::Receiver; + +/// Inner state of the QPair object. +struct Inner { + qpair: *mut spdk_nvme_qpair, + ctrlr: *mut spdk_nvme_ctrlr, + ctrlr_name: String, + state: QPairState, + waiters: Vec, +} + +impl Debug for Inner { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("QPair") + .field("qpair", &self.qpair) + .field("ctrlr", &self.ctrlr) + .field("ctrlr_name", &self.ctrlr_name) + .field("state", &self.state) + .field("waiters", &self.waiters.len()) + .finish() + } +} + +impl Inner { + /// Connects synchronously. + fn ctrlr_connect_sync(&self) -> i32 { + unsafe { spdk_nvme_ctrlr_connect_io_qpair(self.ctrlr, self.qpair) } + } + + /// Connects asynchronously. + #[cfg(feature = "spdk-async-qpair-connect")] + fn ctrlr_connect_async( + &self, + ctx: &mut Connection, + ) -> Result<*mut spdk_nvme_io_qpair_connect_ctx, CoreError> { + let res = unsafe { + spdk_nvme_ctrlr_connect_io_qpair_async( + self.ctrlr, + self.qpair, + Some(qpair_connect_cb), + ctx as *mut _ as *mut c_void, + ) + }; + + if res.is_null() { + error!( + ?self, + "Failed to initiate asynchronous connection on a qpair" + ); + + Err(CoreError::OpenBdev { + source: Errno::ENXIO, + }) + } else { + Ok(res) + } + } +} + +/// Async QPair connection. +#[cfg(feature = "spdk-async-qpair-connect")] +struct Connection<'a> { + inner: Rc>, + sender: Option, + poller: Option>>, + probe: *mut spdk_nvme_io_qpair_connect_ctx, +} + +#[cfg(feature = "spdk-async-qpair-connect")] +impl Debug for Connection<'_> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + self.inner.borrow().fmt(f) + } +} + +#[cfg(feature = "spdk-async-qpair-connect")] +impl Drop for Connection<'_> { + fn drop(&mut self) { + trace!(?self, "I/O connection dropped") + } +} + +#[cfg(feature = "spdk-async-qpair-connect")] +impl<'a> Connection<'a> { + /// Creats a new async qpair connection, and returns a receiver to await the + /// completion. + fn create(inner: Rc>) -> Result { + // Create sender-receiver pair to notify about connection completion. + let (sender, receiver) = oneshot::channel(); + + // Create self (boxed). + let mut ctx = Box::new(Self { + inner: inner.clone(), + sender: Some(sender), + poller: None, + probe: std::ptr::null_mut(), + }); + + // Start poller for the async connection. + ctx.poller = Some( + PollerBuilder::new() + .with_name("io_qpair_connect_poller") + .with_interval(Duration::from_millis(1)) + .with_data(UnsafeRef::new(ctx.as_mut())) + .with_poll_fn(Connection::poll_cb) + .build(), + ); + + // Start async connection, and create a probe for it. + ctx.probe = inner.borrow().ctrlr_connect_async(ctx.as_mut())?; + + // Context is now managed by the SPDK async connection and the poller. + Box::leak(ctx); + + Ok(receiver) + } + + /// Poller's callback. + fn poll_cb(arg: &UnsafeRef) -> i32 { + let conn = unsafe { arg.as_ref() }; + match conn.poll() { + Ok(r) => { + if r { + 0 // continue polling + } else { + 1 // stop the poller + } + } + Err(e) => { + // Error occured, so SPDK won't call connection callback. + // Notify about failure and stop the poller. + let conn = unsafe { Box::from_raw(arg.as_ptr()) }; + conn.complete(Err(CoreError::OpenBdev { + source: e, + })); + 1 // stop the poller + } + } + } + + // Polls the async connection probe. + // Returns true to continue polling, false to stop. + fn poll(&self) -> Result { + if self.state() == QPairState::Dropped { + warn!( + ?self, + "I/O qpair instance dropped, stopping polling \ + async connection probe" + ); + + // `QPairState::Dropped` indicates that the QPair instance was + // dropped, and qpair was destroyed. Free the + // probe manually here as SPDK won't have a chance to do it, and + // stop polling. + unsafe { libc::free(self.probe as *mut _ as *mut c_void) }; + + return Err(Errno::ECANCELED); + } + + // Poll the probe. In the case of a success or an error, the probe will + // be freed by SPDK. + let res = unsafe { + spdk_nvme_ctrlr_io_qpair_connect_poll_async( + self.qpair(), + self.probe, + ) + }; + + match res { + // Connection is complete, callback has been called and this + // connection instance already dropped. + // Warning: does use `self` here. + 0 => Ok(false), + // Connection is still in progress, keep polling. + 1 => Ok(true), + // Error occured during polling. + e => { + let e = Errno::from_i32(-e); + error!(?self, "I/O qpair async connection polling error: {e}"); + Err(e) + } + } + } + + /// Consumes `self` and completes the connection operation with the given + /// result. + fn complete(mut self, res: ConnectResult) { + if let Err(err) = &res { + error!(?self, ?err, "I/O qpair async connection failed"); + } else { + trace!(?self, "I/O qpair successfully connected"); + } + + // Stop the poller. + self.poller.take(); + + // Set state. + if self.state() == QPairState::Connecting { + self.set_state(if res.is_ok() { + QPairState::Connected + } else { + QPairState::Disconnected + }); + } + + let waiters = self.take_waiters(); + + // Notify the listener. + self.sender + .take() + .expect("I/O pair connection object must have been initialized") + .send(res.clone()) + .expect("Failed to notify I/O qpair connection listener"); + + // Notify the waiters. + waiters.into_iter().for_each(|w| { + w.send(res.clone()) + .expect("Failed to notify a connection waiter"); + }); + } + + /// Returns SPDK qpair pointer . + #[inline] + fn qpair(&self) -> *mut spdk_nvme_qpair { + self.inner.borrow().qpair + } + + /// Returns QPair connection state. + #[inline] + fn state(&self) -> QPairState { + self.inner.borrow().state + } + + /// Sets QPair connection state. + #[inline] + fn set_state(&self, state: QPairState) { + self.inner.borrow_mut().state = state + } + + /// Takes out the waiter list to avoid iterating under borrow. + fn take_waiters(&self) -> Vec { + let mut waiters = Vec::new(); + std::mem::swap(&mut self.inner.borrow_mut().waiters, &mut waiters); + waiters + } +} + +/// Async connection callback. +#[cfg(feature = "spdk-async-qpair-connect")] +extern "C" fn qpair_connect_cb( + _qpair: *mut spdk_nvme_qpair, + cb_arg: *mut c_void, +) { + let ctx = unsafe { Box::from_raw(cb_arg as *mut Connection) }; + ctx.complete(Ok(())); +} diff --git a/io-engine/src/bin/io-engine.rs b/io-engine/src/bin/io-engine.rs index 99e018d820..536abcc28d 100644 --- a/io-engine/src/bin/io-engine.rs +++ b/io-engine/src/bin/io-engine.rs @@ -35,6 +35,21 @@ use version_info::fmt_package_info; const PAGES_NEEDED: u32 = 1024; +macro_rules! print_feature { + ($txt:literal, $feat:tt) => {{ + info!( + "{txt} feature ('{feat}') is {s}", + txt = $txt, + feat = $feat, + s = if cfg!(feature = $feat) { + "enabled" + } else { + "disabled" + } + ); + }}; +} + io_engine::CPS_INIT!(); fn start_tokio_runtime(args: &MayastorCliArgs) { let grpc_address = grpc::endpoint(args.grpc_endpoint.clone()); @@ -67,6 +82,10 @@ fn start_tokio_runtime(args: &MayastorCliArgs) { warn!("Nexus reset is disabled"); } + print_feature!("Async QPair connection", "spdk-async-qpair-connect"); + print_feature!("SPDK subsystem events", "spdk-subsystem-events"); + print_feature!("Nexus-level fault injection", "nexus-fault-injection"); + // Initialize Lock manager. let cfg = ResourceLockManagerConfig::default() .with_subsystem(ProtectedSubsystems::NEXUS, 512); diff --git a/io-engine/src/rebuild/rebuild_descriptor.rs b/io-engine/src/rebuild/rebuild_descriptor.rs index a0484cc8f6..a5f1a16525 100644 --- a/io-engine/src/rebuild/rebuild_descriptor.rs +++ b/io-engine/src/rebuild/rebuild_descriptor.rs @@ -1,12 +1,8 @@ use std::sync::Arc; use chrono::{DateTime, Utc}; -use snafu::ResultExt; -use super::{ - rebuild_error::{NoBdevHandle, RebuildError}, - RebuildMap, -}; +use super::{rebuild_error::RebuildError, RebuildMap}; use crate::core::{BlockDeviceDescriptor, BlockDeviceHandle, DescriptorGuard}; /// Contains all descriptors and their associated information which allows the @@ -66,12 +62,16 @@ impl RebuildDescriptor { pub(super) async fn io_handle( descriptor: &dyn BlockDeviceDescriptor, ) -> Result, RebuildError> { - descriptor - .get_io_handle_nonblock() - .await - .context(NoBdevHandle { + descriptor.get_io_handle_nonblock().await.map_err(|e| { + error!( + "{dev}: failed to get I/O handle: {e}", + dev = descriptor.device_name() + ); + RebuildError::NoBdevHandle { + source: e, bdev: descriptor.get_device().device_name(), - }) + } + }) } /// Checks if the block has to be transferred. diff --git a/io-engine/tests/nexus_rebuild_partial.rs b/io-engine/tests/nexus_rebuild_partial.rs index 00e23a7fe1..9c03dd65be 100644 --- a/io-engine/tests/nexus_rebuild_partial.rs +++ b/io-engine/tests/nexus_rebuild_partial.rs @@ -218,8 +218,8 @@ async fn nexus_partial_rebuild_io_fault() { // Check that the nexus child is now faulted, with I/O failure reason. let children = nex_0.get_nexus().await.unwrap().children; assert_eq!(children.len(), 2); - assert_eq!(children[1].state, ChildState::Faulted as i32); - assert_eq!(children[1].state_reason, ChildStateReason::IoFailure as i32); + assert_eq!(children[1].state(), ChildState::Faulted); + assert_eq!(children[1].state_reason(), ChildStateReason::IoFailure); assert_eq!(children[1].has_io_log, true); // Chunk B. @@ -323,8 +323,8 @@ async fn nexus_partial_rebuild_offline_online() { .unwrap(); let children = nex_0.get_nexus().await.unwrap().children; - assert_eq!(children[0].state, ChildState::Degraded as i32); - assert_eq!(children[0].state_reason, ChildStateReason::ByClient as i32); + assert_eq!(children[0].state(), ChildState::Degraded); + assert_eq!(children[0].state_reason(), ChildStateReason::ByClient); validate_replicas(&vec![repl_0.clone(), repl_1.clone()]).await; @@ -477,8 +477,8 @@ async fn nexus_partial_rebuild_double_fault() { .await .unwrap(); let children = nex_0.get_nexus().await.unwrap().children; - assert_eq!(children[0].state, ChildState::Degraded as i32); - assert_eq!(children[0].state_reason, ChildStateReason::ByClient as i32); + assert_eq!(children[0].state(), ChildState::Degraded); + assert_eq!(children[0].state_reason(), ChildStateReason::ByClient); // Write some data to have something to rebuild. test_fio_to_nexus( @@ -531,10 +531,14 @@ async fn nexus_partial_rebuild_double_fault() { let _ = tokio::join!(j0, j1); - // Replica must now be faulted with I/O failure. + // Replica must now be faulted with I/O failure, or, rarely, rebuild failure + // may come first. let children = nex_0.get_nexus().await.unwrap().children; - assert_eq!(children[0].state, ChildState::Faulted as i32); - assert_eq!(children[0].state_reason, ChildStateReason::IoFailure as i32); + assert_eq!(children[0].state(), ChildState::Faulted); + assert!(matches!( + children[0].state_reason(), + ChildStateReason::IoFailure | ChildStateReason::RebuildFailed + )); // [6] nex_0.online_child_replica(&repl_0).await.unwrap(); @@ -555,8 +559,8 @@ async fn nexus_partial_rebuild_double_fault() { .await .unwrap(); let children = nex_0.get_nexus().await.unwrap().children; - assert_eq!(children[0].state, ChildState::Degraded as i32); - assert_eq!(children[0].state_reason, ChildStateReason::ByClient as i32); + assert_eq!(children[0].state(), ChildState::Degraded); + assert_eq!(children[0].state_reason(), ChildStateReason::ByClient); // Write some data to have something to rebuild. test_fio_to_nexus( diff --git a/io-engine/tests/nvmf_connect.rs b/io-engine/tests/nvmf_connect.rs new file mode 100644 index 0000000000..ee37964bf1 --- /dev/null +++ b/io-engine/tests/nvmf_connect.rs @@ -0,0 +1,156 @@ +use std::pin::Pin; + +use once_cell::sync::OnceCell; + +pub mod common; + +use common::MayastorTest; + +use io_engine::{ + bdev::{device_create, device_destroy, device_lookup}, + bdev_api::BdevError, + core::{CoreError, MayastorCliArgs, Share}, + lvs::{Lvs, LvsLvol}, + pool_backend::PoolArgs, +}; + +static MAYASTOR: OnceCell = OnceCell::new(); + +fn get_ms() -> &'static MayastorTest<'static> { + MAYASTOR.get_or_init(|| MayastorTest::new(MayastorCliArgs::default())) +} + +/// Runs several non-blocking qpair connections in parallel. +#[tokio::test] +async fn nvmf_connect_async() { + common::composer_init(); + + let uri = init_nvmf_share().await; + + for _ in 0 .. 20 { + let name = spawn_device_create(&uri).await; + + let f0 = spawn_get_io_handle_nonblock(&name); + let f1 = spawn_get_io_handle_nonblock(&name); + let f2 = spawn_get_io_handle_nonblock(&name); + + let res = tokio::join!(f0, f1, f2); + assert!(res.0.is_ok()); + assert!(res.1.is_ok()); + assert!(res.2.is_ok()); + + spawn_device_destroy(&uri).await.unwrap(); + } + + deinit_nvmf_share().await; +} + +/// Runs several non-blocking qpair connections, and a device destroy in +/// parallel. +#[tokio::test] +async fn nvmf_connect_async_drop() { + common::composer_init(); + + let uri = init_nvmf_share().await; + + for _ in 0 .. 20 { + let name = spawn_device_create(&uri).await; + + let f0 = spawn_get_io_handle_nonblock(&name); + let f1 = spawn_get_io_handle_nonblock(&name); + let f2 = spawn_get_io_handle_nonblock(&name); + let f3 = spawn_device_destroy(&uri); + + let res = tokio::join!(f0, f1, f2, f3); + + #[cfg(feature = "spdk-async-qpair-connect")] + { + assert!(res.0.is_err()); + assert!(res.1.is_err()); + assert!(res.2.is_err()); + } + + #[cfg(not(feature = "spdk-async-qpair-connect"))] + { + assert!(res.0.is_err()); + assert!(res.1.is_err()); + assert!(res.2.is_err()); + } + + assert!(res.3.is_ok()); + } + + deinit_nvmf_share().await; +} + +const POOL_SIZE: u64 = 64 * 1024 * 1024; +const BDEV_NAME: &str = "malloc:///mem0?size_mb=128"; +const POOL_NAME: &str = "pool_0"; +const REPL_NAME: &str = "repl_0"; +const REPL_UUID: &str = "65acdaac-14c4-41d8-a55e-d03bfd7185a4"; + +async fn init_nvmf_share() -> String { + get_ms() + .spawn(async { + let pool = Lvs::create_or_import(PoolArgs { + name: POOL_NAME.to_string(), + disks: vec![BDEV_NAME.to_string()], + uuid: None, + }) + .await + .unwrap(); + + let mut lvol = pool + .create_lvol(REPL_NAME, POOL_SIZE, Some(REPL_UUID), false) + .await + .unwrap(); + + let mut lvol = Pin::new(&mut lvol); + lvol.as_mut().share_nvmf(None).await.unwrap(); + lvol.as_bdev().share_uri().unwrap() + }) + .await +} + +async fn deinit_nvmf_share() { + get_ms() + .spawn(async { + Lvs::lookup(POOL_NAME).unwrap().destroy().await.unwrap(); + }) + .await; +} + +async fn spawn_device_create(uri: &str) -> String { + get_ms() + .spawn({ + let uri = uri.to_string(); + async move { device_create(&uri).await.unwrap() } + }) + .await +} + +async fn spawn_get_io_handle_nonblock(name: &str) -> Result<(), CoreError> { + get_ms() + .spawn({ + let name = name.to_string(); + async move { + device_lookup(&name) + .unwrap() + .open(true) + .unwrap() + .get_io_handle_nonblock() + .await + .map(|_| ()) + } + }) + .await +} + +async fn spawn_device_destroy(uri: &str) -> Result<(), BdevError> { + get_ms() + .spawn({ + let uri = uri.to_string(); + async move { device_destroy(&uri).await } + }) + .await +} diff --git a/spdk-rs b/spdk-rs index e5f27b4d7c..c472c78fe8 160000 --- a/spdk-rs +++ b/spdk-rs @@ -1 +1 @@ -Subproject commit e5f27b4d7c92ccf8d03dc109e6ae962c60072a7a +Subproject commit c472c78fe82ebe919936a29e514ae6a0215e0027