diff --git a/io-engine/src/bdev/nexus/nexus_bdev.rs b/io-engine/src/bdev/nexus/nexus_bdev.rs index 0ac9526a0..a85baac81 100644 --- a/io-engine/src/bdev/nexus/nexus_bdev.rs +++ b/io-engine/src/bdev/nexus/nexus_bdev.rs @@ -1373,30 +1373,38 @@ impl<'n> BdevOps for Nexus<'n> { return; } - let self_ptr = unsafe { unsafe_static_ptr(&self) }; - - Reactor::block_on(async move { - let self_ref = unsafe { &mut *self_ptr }; - - // TODO: double-check interaction with rebuild job logic - // TODO: cancel rebuild jobs? - let n = self_ref.children.iter().filter(|c| c.is_opened()).count(); - - if n > 0 { - warn!( - "{:?}: {} open children remain(s), closing...", - self_ref, n - ); + let open_children = + self.children.iter().filter(|c| c.is_opened()).count(); + // TODO: This doesn't seem possible to happen at this stage, but seems + // we should still try to handle this in separate future since + // we're handling it here anyway as a block_on is not safe to + // use for running production code. + if open_children > 0 { + let self_ptr = unsafe { unsafe_static_ptr(&self) }; + Reactor::block_on(async move { + let self_ref = unsafe { &mut *self_ptr }; + + // TODO: double-check interaction with rebuild job logic + // TODO: cancel rebuild jobs? + let n = + self_ref.children.iter().filter(|c| c.is_opened()).count(); + + if n > 0 { + warn!( + "{:?}: {} open children remain(s), closing...", + self_ref, n + ); - for child in self_ref.children.iter() { - if child.is_opened() { - child.close().await.ok(); + for child in self_ref.children.iter() { + if child.is_opened() { + child.close().await.ok(); + } } } - } - self_ref.children.clear(); - }); + self_ref.children.clear(); + }); + } self.as_mut().unregister_io_device(); unsafe { diff --git a/io-engine/src/core/reactor.rs b/io-engine/src/core/reactor.rs index f2758bc99..e93fac017 100644 --- a/io-engine/src/core/reactor.rs +++ b/io-engine/src/core/reactor.rs @@ -362,8 +362,15 @@ impl Reactor { task } - /// spawn a future locally on the current core block until the future is + /// Spawns a future locally on the current core block until the future is /// completed. The master core is used. + /// # Warning + /// This code should only be used for testing and not running production! + /// This is because when calling block_on from a thread_poll callback, we + /// may be leaving messages behind, which can lead to timeouts etc... + /// A work-around to make this safe could be to potentially "pull" the + /// messages which haven't been polled, and poll them here before + /// proceeding to re-poll via thread_poll again. pub fn block_on(future: F) -> Option where F: Future + 'static, diff --git a/io-engine/src/grpc/mod.rs b/io-engine/src/grpc/mod.rs index b1c978646..70531a836 100644 --- a/io-engine/src/grpc/mod.rs +++ b/io-engine/src/grpc/mod.rs @@ -2,7 +2,6 @@ use futures::channel::oneshot::Receiver; use nix::errno::Errno; pub use server::MayastorGrpcServer; use std::{ - error::Error, fmt::{Debug, Display}, future::Future, time::Duration, @@ -158,22 +157,6 @@ macro_rules! spdk_submit { pub type GrpcResult = std::result::Result, Status>; -/// call the given future within the context of the reactor on the first core -/// on the init thread, while the future is waiting to be completed the reactor -/// is continuously polled so that forward progress can be made -pub fn rpc_call(future: G) -> Result, tonic::Status> -where - G: Future> + 'static, - I: 'static, - L: Into + Error + 'static, - A: 'static + From, -{ - Reactor::block_on(future) - .unwrap() - .map(|r| Response::new(A::from(r))) - .map_err(|e| e.into()) -} - /// Submit rpc code to the primary reactor. pub fn rpc_submit( future: F,