Skip to content

Commit

Permalink
chore: add warning to block_on
Browse files Browse the repository at this point in the history
Should this become an unsafe function?

Signed-off-by: Tiago Castro <[email protected]>
  • Loading branch information
tiagolobocastro committed Aug 9, 2024
1 parent d5b5d44 commit d9e19b3
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 38 deletions.
48 changes: 28 additions & 20 deletions io-engine/src/bdev/nexus/nexus_bdev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1373,30 +1373,38 @@ impl<'n> BdevOps for Nexus<'n> {
return;
}

let self_ptr = unsafe { unsafe_static_ptr(&self) };

Reactor::block_on(async move {
let self_ref = unsafe { &mut *self_ptr };

// TODO: double-check interaction with rebuild job logic
// TODO: cancel rebuild jobs?
let n = self_ref.children.iter().filter(|c| c.is_opened()).count();

if n > 0 {
warn!(
"{:?}: {} open children remain(s), closing...",
self_ref, n
);
let open_children =
self.children.iter().filter(|c| c.is_opened()).count();
// TODO: This doesn't seem possible to happen at this stage, but seems
// we should still try to handle this in separate future since
// we're handling it here anyway as a block_on is not safe to
// use for running production code.
if open_children > 0 {
let self_ptr = unsafe { unsafe_static_ptr(&self) };
Reactor::block_on(async move {
let self_ref = unsafe { &mut *self_ptr };

// TODO: double-check interaction with rebuild job logic
// TODO: cancel rebuild jobs?
let n =
self_ref.children.iter().filter(|c| c.is_opened()).count();

if n > 0 {
warn!(
"{:?}: {} open children remain(s), closing...",
self_ref, n
);

for child in self_ref.children.iter() {
if child.is_opened() {
child.close().await.ok();
for child in self_ref.children.iter() {
if child.is_opened() {
child.close().await.ok();
}
}
}
}

self_ref.children.clear();
});
self_ref.children.clear();
});
}

self.as_mut().unregister_io_device();
unsafe {
Expand Down
9 changes: 8 additions & 1 deletion io-engine/src/core/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,15 @@ impl Reactor {
task
}

/// spawn a future locally on the current core block until the future is
/// Spawns a future locally on the current core block until the future is
/// completed. The master core is used.
/// # Warning
/// This code should only be used for testing and not running production!
/// This is because when calling block_on from a thread_poll callback, we
/// may be leaving messages behind, which can lead to timeouts etc...
/// A work-around to make this safe could be to potentially "pull" the
/// messages which haven't been polled, and poll them here before
/// proceeding to re-poll via thread_poll again.
pub fn block_on<F, R>(future: F) -> Option<R>
where
F: Future<Output = R> + 'static,
Expand Down
17 changes: 0 additions & 17 deletions io-engine/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use futures::channel::oneshot::Receiver;
use nix::errno::Errno;
pub use server::MayastorGrpcServer;
use std::{
error::Error,
fmt::{Debug, Display},
future::Future,
time::Duration,
Expand Down Expand Up @@ -158,22 +157,6 @@ macro_rules! spdk_submit {

pub type GrpcResult<T> = std::result::Result<Response<T>, Status>;

/// call the given future within the context of the reactor on the first core
/// on the init thread, while the future is waiting to be completed the reactor
/// is continuously polled so that forward progress can be made
pub fn rpc_call<G, I, L, A>(future: G) -> Result<Response<A>, tonic::Status>
where
G: Future<Output = Result<I, L>> + 'static,
I: 'static,
L: Into<Status> + Error + 'static,
A: 'static + From<I>,
{
Reactor::block_on(future)
.unwrap()
.map(|r| Response::new(A::from(r)))
.map_err(|e| e.into())
}

/// Submit rpc code to the primary reactor.
pub fn rpc_submit<F, R, E>(
future: F,
Expand Down

0 comments on commit d9e19b3

Please sign in to comment.