diff --git a/io-engine/src/bdev/nexus/nexus_bdev.rs b/io-engine/src/bdev/nexus/nexus_bdev.rs index 0ac9526a0..ffc74259c 100644 --- a/io-engine/src/bdev/nexus/nexus_bdev.rs +++ b/io-engine/src/bdev/nexus/nexus_bdev.rs @@ -1373,30 +1373,38 @@ impl<'n> BdevOps for Nexus<'n> { return; } - let self_ptr = unsafe { unsafe_static_ptr(&self) }; - - Reactor::block_on(async move { - let self_ref = unsafe { &mut *self_ptr }; - - // TODO: double-check interaction with rebuild job logic - // TODO: cancel rebuild jobs? - let n = self_ref.children.iter().filter(|c| c.is_opened()).count(); - - if n > 0 { - warn!( - "{:?}: {} open children remain(s), closing...", - self_ref, n - ); + let online_children = + self.children.iter().filter(|c| c.is_opened()).count(); + // TODO: This doesn't seem possible to happen at this stage, but seems + // we should still try to handle this in separate future since + // we're handling it here anyway as a block_on is not safe to + // use for running production code. + if online_children > 0 { + let self_ptr = unsafe { unsafe_static_ptr(&self) }; + Reactor::block_on(async move { + let self_ref = unsafe { &mut *self_ptr }; + + // TODO: double-check interaction with rebuild job logic + // TODO: cancel rebuild jobs? + let n = + self_ref.children.iter().filter(|c| c.is_opened()).count(); + + if n > 0 { + warn!( + "{:?}: {} open children remain(s), closing...", + self_ref, n + ); - for child in self_ref.children.iter() { - if child.is_opened() { - child.close().await.ok(); + for child in self_ref.children.iter() { + if child.is_opened() { + child.close().await.ok(); + } } } - } - self_ref.children.clear(); - }); + self_ref.children.clear(); + }); + } self.as_mut().unregister_io_device(); unsafe { diff --git a/io-engine/src/bdev/nexus/nexus_bdev_children.rs b/io-engine/src/bdev/nexus/nexus_bdev_children.rs index 2e5eca333..ecbc2b2b7 100644 --- a/io-engine/src/bdev/nexus/nexus_bdev_children.rs +++ b/io-engine/src/bdev/nexus/nexus_bdev_children.rs @@ -900,7 +900,7 @@ impl<'n> Nexus<'n> { nexus_name, child_device, "Unplugging nexus child device", ); - child.unplug(); + child.unplug().await; } None => { warn!( diff --git a/io-engine/src/bdev/nexus/nexus_child.rs b/io-engine/src/bdev/nexus/nexus_child.rs index d7dab1f31..c696d3cbd 100644 --- a/io-engine/src/bdev/nexus/nexus_child.rs +++ b/io-engine/src/bdev/nexus/nexus_child.rs @@ -24,8 +24,6 @@ use crate::{ BlockDeviceHandle, CoreError, DeviceEventSink, - Reactor, - Reactors, VerboseError, }, eventing::replica_events::state_change_event_meta, @@ -1109,7 +1107,7 @@ impl<'c> NexusChild<'c> { /// underlying device is removed. /// /// Note: The descriptor *must* be dropped for the unplug to complete. - pub(crate) fn unplug(&mut self) { + pub(crate) async fn unplug(&mut self) { info!("{self:?}: unplugging child..."); let state = self.state(); @@ -1139,12 +1137,10 @@ impl<'c> NexusChild<'c> { // device-related events directly. if state != ChildState::Faulted(FaultReason::IoError) { let nexus_name = self.parent.clone(); - Reactor::block_on(async move { - match nexus_lookup_mut(&nexus_name) { - Some(n) => n.reconfigure(DrEvent::ChildUnplug).await, - None => error!("Nexus '{nexus_name}' not found"), - } - }); + match nexus_lookup_mut(&nexus_name) { + Some(n) => n.reconfigure(DrEvent::ChildUnplug).await, + None => error!("Nexus '{nexus_name}' not found"), + } } if is_destroying { @@ -1153,22 +1149,16 @@ impl<'c> NexusChild<'c> { self.device_descriptor.take(); } - self.unplug_complete(); - info!("{self:?}: child successfully unplugged"); + self.unplug_complete().await; } /// Signal that the child unplug is complete. - fn unplug_complete(&self) { - let sender = self.remove_channel.0.clone(); - let name = self.name.clone(); - Reactors::current().send_future(async move { - if let Err(e) = sender.send(()).await { - error!( - "Failed to send unplug complete for child '{}': {}", - name, e - ); - } - }); + async fn unplug_complete(&self) { + if let Err(error) = self.remove_channel.0.send(()).await { + info!("{self:?}: failed to send unplug complete: {error}"); + } else { + info!("{self:?}: child successfully unplugged"); + } } /// create a new nexus child diff --git a/io-engine/src/bdev/nvmx/controller.rs b/io-engine/src/bdev/nvmx/controller.rs index 1751a91f6..b98b97c13 100644 --- a/io-engine/src/bdev/nvmx/controller.rs +++ b/io-engine/src/bdev/nvmx/controller.rs @@ -1071,8 +1071,11 @@ pub(crate) mod options { self.admin_timeout_ms = Some(timeout); self } - pub fn with_fabrics_connect_timeout_us(mut self, timeout: u64) -> Self { - self.fabrics_connect_timeout_us = Some(timeout); + pub fn with_fabrics_connect_timeout_us>>( + mut self, + timeout: T, + ) -> Self { + self.fabrics_connect_timeout_us = timeout.into(); self } diff --git a/io-engine/src/bdev/nvmx/qpair.rs b/io-engine/src/bdev/nvmx/qpair.rs index 141364ce9..ecdf7bbca 100644 --- a/io-engine/src/bdev/nvmx/qpair.rs +++ b/io-engine/src/bdev/nvmx/qpair.rs @@ -467,9 +467,9 @@ impl<'a> Connection<'a> { 0 => Ok(false), // Connection is still in progress, keep polling. 1 => Ok(true), - // Error occured during polling. + // Error occurred during polling. e => { - let e = Errno::from_i32(-e); + let e = Errno::from_i32(e.abs()); error!(?self, "I/O qpair async connection polling error: {e}"); Err(e) } diff --git a/io-engine/src/bdev/nvmx/uri.rs b/io-engine/src/bdev/nvmx/uri.rs index 29b9d4b44..856b60655 100644 --- a/io-engine/src/bdev/nvmx/uri.rs +++ b/io-engine/src/bdev/nvmx/uri.rs @@ -227,6 +227,12 @@ impl<'probe> NvmeControllerContext<'probe> { ) .with_transport_retry_count( Config::get().nvme_bdev_opts.transport_retry_count as u8, + ) + .with_fabrics_connect_timeout_us( + crate::subsys::config::opts::try_from_env( + "NVMF_FABRICS_CONNECT_TIMEOUT", + 1_000_000, + ), ); let hostnqn = template.hostnqn.clone().or_else(|| { diff --git a/io-engine/src/core/reactor.rs b/io-engine/src/core/reactor.rs index f2758bc99..e93fac017 100644 --- a/io-engine/src/core/reactor.rs +++ b/io-engine/src/core/reactor.rs @@ -362,8 +362,15 @@ impl Reactor { task } - /// spawn a future locally on the current core block until the future is + /// Spawns a future locally on the current core block until the future is /// completed. The master core is used. + /// # Warning + /// This code should only be used for testing and not running production! + /// This is because when calling block_on from a thread_poll callback, we + /// may be leaving messages behind, which can lead to timeouts etc... + /// A work-around to make this safe could be to potentially "pull" the + /// messages which haven't been polled, and poll them here before + /// proceeding to re-poll via thread_poll again. pub fn block_on(future: F) -> Option where F: Future + 'static, diff --git a/io-engine/src/grpc/mod.rs b/io-engine/src/grpc/mod.rs index b1c978646..70531a836 100644 --- a/io-engine/src/grpc/mod.rs +++ b/io-engine/src/grpc/mod.rs @@ -2,7 +2,6 @@ use futures::channel::oneshot::Receiver; use nix::errno::Errno; pub use server::MayastorGrpcServer; use std::{ - error::Error, fmt::{Debug, Display}, future::Future, time::Duration, @@ -158,22 +157,6 @@ macro_rules! spdk_submit { pub type GrpcResult = std::result::Result, Status>; -/// call the given future within the context of the reactor on the first core -/// on the init thread, while the future is waiting to be completed the reactor -/// is continuously polled so that forward progress can be made -pub fn rpc_call(future: G) -> Result, tonic::Status> -where - G: Future> + 'static, - I: 'static, - L: Into + Error + 'static, - A: 'static + From, -{ - Reactor::block_on(future) - .unwrap() - .map(|r| Response::new(A::from(r))) - .map_err(|e| e.into()) -} - /// Submit rpc code to the primary reactor. pub fn rpc_submit( future: F, diff --git a/io-engine/src/subsys/config/opts.rs b/io-engine/src/subsys/config/opts.rs index aece4fcda..fb9ad70be 100644 --- a/io-engine/src/subsys/config/opts.rs +++ b/io-engine/src/subsys/config/opts.rs @@ -156,7 +156,7 @@ pub struct NvmfTcpTransportOpts { } /// try to read an env variable or returns the default when not found -fn try_from_env(name: &str, default: T) -> T +pub(crate) fn try_from_env(name: &str, default: T) -> T where T: FromStr + Display + Copy, ::Err: Debug + Display, diff --git a/io-engine/src/subsys/nvmf/target.rs b/io-engine/src/subsys/nvmf/target.rs index 56c23f281..833d39d2d 100644 --- a/io-engine/src/subsys/nvmf/target.rs +++ b/io-engine/src/subsys/nvmf/target.rs @@ -27,7 +27,7 @@ use spdk_rs::libspdk::{ use crate::{ constants::NVME_CONTROLLER_MODEL_ID, - core::{Cores, Mthread, Reactor, Reactors}, + core::{Cores, Mthread, Reactors}, ffihelper::{AsStr, FfiResult}, subsys::{ nvmf::{ @@ -270,9 +270,9 @@ impl Target { Ok(()) } - /// enable discovery for the target -- note that the discovery system is not - /// started - fn enable_discovery(&self) { + /// Create the discovery for the target -- note that the discovery system is + /// not started. + fn create_discovery_subsystem(&self) -> NvmfSubsystem { debug!("enabling discovery for target"); let discovery = unsafe { NvmfSubsystem::from(spdk_nvmf_subsystem_create( @@ -296,12 +296,7 @@ impl Target { discovery.allow_any(true); - Reactor::block_on(async { - let nqn = discovery.get_nqn(); - if let Err(e) = discovery.start().await { - error!("Error starting subsystem '{}': {}", nqn, e.to_string()); - } - }); + discovery } /// stop all subsystems on this target we are borrowed here @@ -355,13 +350,20 @@ impl Target { /// Final state for the target during init. pub fn running(&mut self) { - self.enable_discovery(); - info!( - "nvmf target accepting new connections and is ready to roll..{}", - '\u{1F483}' - ); + let discovery = self.create_discovery_subsystem(); - unsafe { spdk_subsystem_init_next(0) } + Reactors::master().send_future(async move { + let nqn = discovery.get_nqn(); + if let Err(error) = discovery.start().await { + error!("Error starting subsystem '{nqn}': {error}"); + } + + info!( + "nvmf target accepting new connections and is ready to roll..{}", + '\u{1F483}' + ); + unsafe { spdk_subsystem_init_next(0) } + }) } /// Shutdown procedure.