Skip to content

Commit

Permalink
fix(nexus): serialised nexus i/o suspend/resume
Browse files Browse the repository at this point in the history
Suspend/resume operations on NVMe subsystems are now serialised for nexuses,
which properly handles simultaneous I/O suspension/resume operations
in case multiple replicas get retired at the same time.

Resolves: CAS-1299
  • Loading branch information
dsavitskiy authored and tiagolobocastro committed Apr 25, 2022
1 parent bfd8a8a commit c27e943
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 157 deletions.
2 changes: 2 additions & 0 deletions mayastor/src/bdev/nexus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod nexus_bdev_snapshot;
mod nexus_channel;
mod nexus_child;
mod nexus_io;
mod nexus_io_subsystem;
mod nexus_iter;
mod nexus_module;
mod nexus_nbd;
Expand Down Expand Up @@ -54,6 +55,7 @@ pub use nexus_child::{
Reason,
};
pub(crate) use nexus_io::{nexus_submit_request, NioCtx};
pub(self) use nexus_io_subsystem::NexusIoSubsystem;
pub use nexus_iter::{
nexus_iter,
nexus_iter_mut,
Expand Down
176 changes: 45 additions & 131 deletions mayastor/src/bdev/nexus/nexus_bdev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@
use std::{
fmt::{Display, Formatter},
marker::PhantomPinned,
mem::MaybeUninit,
os::raw::c_void,
pin::Pin,
};

use crossbeam::atomic::AtomicCell;
use futures::channel::oneshot;
use nix::errno::Errno;
use serde::Serialize;
Expand All @@ -35,13 +33,15 @@ use super::{
};

use crate::{
bdev::{device_destroy, nexus::nexus_persistence::PersistentNexusInfo},
bdev::{
device_destroy,
nexus::{nexus_persistence::PersistentNexusInfo, NexusIoSubsystem},
},
core::{
Bdev,
BdevHandle,
Command,
CoreError,
Cores,
DeviceEventSink,
IoType,
Protocol,
Expand Down Expand Up @@ -453,8 +453,8 @@ pub struct Nexus<'n> {
pub(crate) nvme_params: NexusNvmeParams,
/// uuid of the nexus (might not be the same as the nexus bdev!)
nexus_uuid: Uuid,
/// raw pointer to bdev (to destruct it later using Box::from_raw())
bdev_obj: MaybeUninit<Bdev<Nexus<'n>>>,
/// Bdev wrapper instance.
bdev: Option<Bdev<Nexus<'n>>>,
/// represents the current state of the Nexus
pub state: parking_lot::Mutex<NexusState>,
/// The offset in blocks where the data partition starts.
Expand All @@ -466,11 +466,10 @@ pub struct Nexus<'n> {
pub nexus_target: Option<NexusTarget>,
/// Indicates if the Nexus has an I/O device.
has_io_device: bool,
/// Nexus pause counter to allow concurrent pause/resume.
pause_state: AtomicCell<NexusPauseState>,
pause_waiters: Vec<oneshot::Sender<i32>>,
/// Information associated with the persisted NexusInfo structure.
pub nexus_info: futures::lock::Mutex<PersistentNexusInfo>,
/// Nexus I/O subsystem.
io_subsystem: Option<NexusIoSubsystem<'n>>,
/// TODO
event_sink: Option<DeviceEventSink>,
/// Prevent auto-Unpin.
Expand Down Expand Up @@ -567,18 +566,17 @@ impl<'n> Nexus<'n> {
child_count: 0,
children: Vec::new(),
state: parking_lot::Mutex::new(NexusState::Init),
bdev_obj: MaybeUninit::uninit(),
bdev: None,
data_ent_offset: 0,
share_handle: None,
req_size: size,
nexus_target: None,
nvme_params,
has_io_device: false,
pause_state: AtomicCell::new(NexusPauseState::Unpaused),
pause_waiters: Vec::new(),
nexus_info: futures::lock::Mutex::new(PersistentNexusInfo::new(
nexus_info_key,
)),
io_subsystem: None,
nexus_uuid: Default::default(),
event_sink: None,
_pin: Default::default(),
Expand All @@ -597,14 +595,19 @@ impl<'n> Nexus<'n> {

unsafe {
let n = bdev.data_mut().get_unchecked_mut();
n.bdev_obj.write(
Bdev::checked_from_ptr(bdev.unsafe_inner_mut_ptr()).unwrap(),
);
n.bdev = Some(Bdev::new(bdev.clone()));

n.event_sink = Some(DeviceEventSink::new(bdev.data_mut()));

// Set the nexus UUID to be the specified nexus UUID, otherwise
// inherit the bdev UUID.
n.nexus_uuid = nexus_uuid.unwrap_or_else(|| n.bdev().uuid());

// Set I/O subsystem.
n.io_subsystem = Some(NexusIoSubsystem::new(
name.to_string(),
n.bdev.as_mut().unwrap(),
));
}

// register children
Expand Down Expand Up @@ -777,7 +780,7 @@ impl<'n> Nexus<'n> {
}
}

/// Destroy the nexus
/// Destroy the Nexus.
pub async fn destroy(mut self: Pin<&mut Self>) -> Result<(), Error> {
info!("Destroying nexus {}", self.name);

Expand Down Expand Up @@ -817,58 +820,16 @@ impl<'n> Nexus<'n> {
}
}

/// Resume IO to the bdev.
/// Returns a mutable reference to Nexus I/O.
fn io_subsystem_mut(self: Pin<&mut Self>) -> &mut NexusIoSubsystem<'n> {
unsafe { self.get_unchecked_mut().io_subsystem.as_mut().unwrap() }
}

/// Resumes I/O to the Bdev.
/// Note: in order to handle concurrent resumes properly, this function must
/// be called only from the master core.
pub async fn resume(self: Pin<&mut Self>) -> Result<(), Error> {
assert_eq!(Cores::current(), Cores::first());

// In case nexus is already unpaused or is being paused, bail out.
if matches!(
self.pause_state.load(),
NexusPauseState::Pausing | NexusPauseState::Unpaused
) {
return Ok(());
}

info!(
"{} resuming nexus, waiters: {}",
self.name,
self.pause_waiters.len(),
);

if let Some(Protocol::Nvmf) = self.shared() {
if self.pause_waiters.is_empty() {
if let Some(subsystem) = NvmfSubsystem::nqn_lookup(&self.name) {
self.pause_state.store(NexusPauseState::Unpausing);
subsystem.resume().await.unwrap();
// The trickiest case: a new waiter appeared during nexus
// unpausing. By the agreement we keep
// nexus paused for the waiters, so pause
// the nexus to restore status quo.
if !self.pause_waiters.is_empty() {
info!(
"{} concurrent nexus pausing requested during unpausing, re-pausing",
self.name,
);
subsystem.pause().await.unwrap();
self.pause_state.store(NexusPauseState::Paused);
}
}
}
}

// Keep the Nexus paused in case there are waiters.
if !self.pause_waiters.is_empty() {
let s = unsafe {
self.get_unchecked_mut().pause_waiters.pop().unwrap()
};
s.send(0).expect("Nexus pause waiter disappeared");
} else {
self.pause_state.store(NexusPauseState::Unpaused);
}

Ok(())
self.io_subsystem_mut().resume().await
}

/// Suspend any incoming IO to the bdev pausing the controller allows us to
Expand All @@ -878,66 +839,12 @@ impl<'n> Nexus<'n> {
/// with the nexus paused once they are awakened via resume().
/// Note: in order to handle concurrent pauses properly, this function must
/// be called only from the master core.
pub async fn pause(&self) -> Result<(), Error> {
assert_eq!(Cores::current(), Cores::first());

let state = self.pause_state.compare_exchange(
NexusPauseState::Unpaused,
NexusPauseState::Pausing,
);

match state {
// Pause nexus if it is in the unpaused state.
Ok(NexusPauseState::Unpaused) => {
if let Some(Protocol::Nvmf) = self.shared() {
if let Some(subsystem) =
NvmfSubsystem::nqn_lookup(&self.name)
{
info!(
"{} pausing subsystem {}",
self.name,
subsystem.get_nqn()
);
subsystem.pause().await.unwrap();
info!(
"{} subsystem {} paused",
self.name,
subsystem.get_nqn()
);
}
}
// the fist pause will win
self.pause_state
.compare_exchange(
NexusPauseState::Pausing,
NexusPauseState::Paused,
)
.unwrap();
}

Err(NexusPauseState::Pausing) | Err(NexusPauseState::Paused) => {
// we are already pausing or paused
return Ok(());
}

// we must pause again, schedule pause operation
Err(NexusPauseState::Unpausing) => {
return Err(Error::Pause {
state: NexusPauseState::Unpausing,
name: self.name.clone(),
});
}
_ => {
panic!("Corrupted nexus state");
}
}

Ok(())
pub async fn pause(self: Pin<&mut Self>) -> Result<(), Error> {
self.io_subsystem_mut().suspend().await
}

// Abort all active I/O for target child and set I/O fail-fast flag
// for the child.

#[allow(dead_code)]
async fn update_failfast(
&self,
Expand All @@ -952,7 +859,6 @@ impl<'n> Nexus<'n> {
child,
};

// let io_device = self.io_device.as_ref().expect("Nexus not opened");
assert!(self.has_io_device);

self.traverse_io_channels(
Expand Down Expand Up @@ -995,13 +901,13 @@ impl<'n> Nexus<'n> {
}

pub async fn child_retire(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
name: String,
) -> Result<(), Error> {
self.child_retire_for_each_channel(Some(name.clone()))
.await?;
debug!(?self, "PAUSE");
self.pause().await?;
self.as_mut().pause().await?;
debug!(?self, "UNPAUSE");
if let Some(child) = self.lookup_child(&name) {
let uri = child.name.clone();
Expand Down Expand Up @@ -1149,16 +1055,24 @@ impl<'n> Nexus<'n> {

// Unsafe part of Nexus.
impl<'n> Nexus<'n> {
/// TODO
pub(crate) unsafe fn bdev(&self) -> &Bdev<Nexus<'n>> {
&*self.bdev_obj.as_ptr()
/// Returns a reference to Nexus's Bdev.
pub(super) unsafe fn bdev(&self) -> &Bdev<Nexus<'n>> {
self.bdev
.as_ref()
.expect("Nexus Bdev object is not initialized")
}

/// TODO
pub(crate) unsafe fn bdev_mut(
/// Returns a mutable reference to Nexus's Bdev.
pub(super) unsafe fn bdev_mut(
self: Pin<&mut Self>,
) -> &'n mut Bdev<Nexus<'n>> {
&mut *self.get_unchecked_mut().bdev_obj.as_mut_ptr()
) -> &mut Bdev<Nexus<'n>> {
self.get_unchecked_mut().bdev.as_mut().unwrap()
}

/// Returns a pinned Bdev reference to allow calling methods that require a
/// Pin<&mut>, e.g. methods of Share trait.
pub(super) fn pin_bdev_mut(self: Pin<&mut Self>) -> Pin<&mut Bdev<Self>> {
unsafe { Pin::new_unchecked(self.bdev_mut()) }
}

/// Sets the required alignment of the Nexus.
Expand Down
2 changes: 1 addition & 1 deletion mayastor/src/bdev/nexus/nexus_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl<'n> NexusBio<'n> {

/// assess the IO if we need to mark it failed or ok.
/// obtain the Nexus struct embedded within the bdev
pub(crate) fn nexus_as_ref(&self) -> Pin<&Nexus> {
pub(crate) fn nexus_as_ref(&self) -> Pin<&Nexus<'n>> {
self.bdev_checked(NEXUS_PRODUCT_ID).data()
}

Expand Down
Loading

0 comments on commit c27e943

Please sign in to comment.