Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(nexus/io): set nvmf subsystem as frozen #1415

Merged
merged 3 commits into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mayastor/src/bdev/nexus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub use nexus_bdev::{
Error,
Nexus,
NexusNvmeParams,
NexusPauseState,
NexusState,
NexusStatus,
NexusTarget,
Expand Down
46 changes: 36 additions & 10 deletions mayastor/src/bdev/nexus/nexus_bdev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,26 @@ pub enum NexusPauseState {
Unpaused,
Pausing,
Paused,
Frozen,
Unpausing,
}
impl From<super::nexus_io_subsystem::NexusPauseState> for NexusPauseState {
fn from(value: super::nexus_io_subsystem::NexusPauseState) -> Self {
match value {
super::nexus_io_subsystem::NexusPauseState::Unpaused => {
Self::Unpaused
}
super::nexus_io_subsystem::NexusPauseState::Pausing => {
Self::Pausing
}
super::nexus_io_subsystem::NexusPauseState::Paused => Self::Paused,
super::nexus_io_subsystem::NexusPauseState::Frozen => Self::Frozen,
super::nexus_io_subsystem::NexusPauseState::Unpausing => {
Self::Unpausing
}
}
}
}

#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum NvmeAnaState {
Expand Down Expand Up @@ -902,11 +920,28 @@ impl<'n> Nexus<'n> {
unsafe { self.get_unchecked_mut().io_subsystem.as_mut().unwrap() }
}

/// Get the subsystem pause state.
pub fn io_subsystem_state(&self) -> Option<NexusPauseState> {
self.io_subsystem.as_ref().map(|io| io.pause_state().into())
}

/// Resumes I/O to the Bdev.
/// Note: in order to handle concurrent resumes properly, this function must
/// be called only from the master core.
pub async fn resume(self: Pin<&mut Self>) -> Result<(), Error> {
self.io_subsystem_mut().resume().await
// If we are faulted then rather than failing all IO back to the
// initiator we can instead leave the subsystem frozen, and wait
// for the control-plane to do something about this.
// Meanwhile the initiator will begin its reconnect loop and won't see
// a swarm of IO failures which could cause a fs to shutdown.
let freeze = match self.status() {
NexusStatus::Faulted => {
tracing::warn!(?self, "Nexus Faulted: will not resume I/Os");
true
}
_ => false,
};
self.io_subsystem_mut().resume(freeze).await
}

/// Suspend any incoming IO to the bdev pausing the controller allows us to
Expand Down Expand Up @@ -1030,15 +1065,6 @@ impl<'n> Nexus<'n> {
})))
.await;
}
// If we are faulted then rather than failing all IO back to the
// initiator we can instead leave the subsystem paused, and wait
// for the control-plane to do something about this.
// Meanwhile the initiator will begin it's reconnect loop and won't see
// a swarm of IO failures which could cause a fs to shutdown.
if self.status() == NexusStatus::Faulted {
tracing::warn!(?self, "Nexus Faulted: not resuming subsystem");
return Ok(());
}
debug!(?self, "RESUMING");
self.resume().await
}
Expand Down
46 changes: 33 additions & 13 deletions mayastor/src/bdev/nexus/nexus_io_subsystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub(super) enum NexusPauseState {
Unpaused,
Pausing,
Paused,
Frozen,
Unpausing,
}

Expand Down Expand Up @@ -51,6 +52,11 @@ impl<'n> NexusIoSubsystem<'n> {
}
}

/// Get the subsystem pause state.
pub(super) fn pause_state(&self) -> NexusPauseState {
self.pause_state.load()
}

/// Suspend any incoming IO to the bdev pausing the controller allows us to
/// handle internal events and which is a protocol feature.
/// In case concurrent pause requests take place, the other callers
Expand Down Expand Up @@ -99,7 +105,7 @@ impl<'n> NexusIoSubsystem<'n> {
break;
}
// Subsystem is already paused, increment number of paused.
Err(NexusPauseState::Paused) => {
Err(NexusPauseState::Paused | NexusPauseState::Frozen) => {
trace!(nexus=%self.name, "nexus is already paused, incrementing refcount");
self.pause_cnt.fetch_add(1, Ordering::SeqCst);
break;
Expand Down Expand Up @@ -133,13 +139,17 @@ impl<'n> NexusIoSubsystem<'n> {
/// Resume IO to the bdev.
/// Note: in order to handle concurrent resumes properly, this function must
/// be called only from the master core.
pub(super) async fn resume(&mut self) -> Result<(), NexusError> {
pub(super) async fn resume(
&mut self,
freeze: bool,
) -> Result<(), NexusError> {
assert_eq!(Cores::current(), Cores::first());

trace!(?self.name, "resuming nexus I/O");

loop {
match self.pause_state.load() {
let state = self.pause_state.load();
match state {
// Already unpaused, bail out.
NexusPauseState::Unpaused => {
break;
Expand All @@ -154,20 +164,30 @@ impl<'n> NexusIoSubsystem<'n> {
trace!(?self.name, "completed state transition, retrying Resume operation");
}
// Unpause the subsystem, taking into account the overall number
// of pauses.
NexusPauseState::Paused => {
// of pauses, or leave it frozen.
NexusPauseState::Paused | NexusPauseState::Frozen => {
let v = self.pause_cnt.fetch_sub(1, Ordering::SeqCst);
// In case the last pause discarded, resume the subsystem.
if v == 1 {
if let Some(subsystem) =
NvmfSubsystem::nqn_lookup(&self.name)
{
self.pause_state.store(NexusPauseState::Unpausing);
trace!(nexus=%self.name, nqn=%subsystem.get_nqn(), "resuming subsystem");
subsystem.resume().await.unwrap();
trace!(nexus=%self.name, nqn=%subsystem.get_nqn(), "subsystem resumed");
if state == NexusPauseState::Frozen || freeze {
if let Some(subsystem) =
NvmfSubsystem::nqn_lookup(&self.name)
{
trace!(nexus=%self.name, nqn=%subsystem.get_nqn(), "not resuming frozen subsystem");
}
self.pause_state.store(NexusPauseState::Frozen);
} else {
if let Some(subsystem) =
NvmfSubsystem::nqn_lookup(&self.name)
{
self.pause_state
.store(NexusPauseState::Unpausing);
trace!(nexus=%self.name, nqn=%subsystem.get_nqn(), "resuming subsystem");
subsystem.resume().await.unwrap();
trace!(nexus=%self.name, nqn=%subsystem.get_nqn(), "subsystem resumed");
}
self.pause_state.store(NexusPauseState::Unpaused);
}
self.pause_state.store(NexusPauseState::Unpaused);
}
break;
}
Expand Down
185 changes: 184 additions & 1 deletion mayastor/tests/nexus_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ use mayastor::{
bdev::nexus::{
nexus_create,
nexus_create_v2,
nexus_lookup,
nexus_lookup_mut,
NexusNvmeParams,
NexusPauseState,
NexusStatus,
NvmeAnaState,
},
core::{MayastorCliArgs, Protocol},
Expand All @@ -25,13 +28,17 @@ use rpc::mayastor::{
use std::process::{Command, ExitStatus};

pub mod common;
use common::{compose::Builder, MayastorTest};
use common::{
compose::{Builder, ComposeTest},
MayastorTest,
};

extern crate libnvme_rs;

static POOL_NAME: &str = "tpool";
static NXNAME: &str = "nexus0";
static UUID: &str = "cdc2a7db-3ac3-403a-af80-7fadc1581c47";
static UUID2: &str = "cdc2a7db-3ac3-403a-af80-7fadc1581c48";
static HOSTNQN: &str = "nqn.2019-05.io.openebs";
static HOSTID0: &str = "53b35ce9-8e71-49a9-ab9b-cba7c5670fad";
static HOSTID1: &str = "c1affd2d-ef79-4ba4-b5cf-8eb48f9c07d0";
Expand Down Expand Up @@ -610,3 +617,179 @@ async fn nexus_io_write_zeroes() {
})
.await;
}

#[tokio::test]
async fn nexus_io_freeze() {
std::env::set_var("NEXUS_NVMF_ANA_ENABLE", "1");
std::env::set_var("NEXUS_NVMF_RESV_ENABLE", "1");
// create a new composeTest
let test = Builder::new()
.name("nexus_io_freeze")
.network("10.1.0.0/16")
.add_container("ms1")
.with_clean(true)
.build()
.await
.unwrap();
create_pool_replicas(&test, 0).await;

let mayastor = get_ms();
let hdls = test.grpc_handles().await.unwrap();
let ip0 = hdls[0].endpoint.ip();
let nexus_name = format!("nexus-{}", UUID);
let nexus_children = [
format!("nvmf://{}:8420/{}:{}", ip0, HOSTNQN, UUID),
format!("nvmf://{}:8420/{}:{}", ip0, HOSTNQN, UUID2),
];

let name = nexus_name.clone();
let children = nexus_children.clone();
mayastor
.spawn(async move {
// create nexus on local node with remote replica as child
nexus_create(&name, 32 * 1024 * 1024, Some(UUID), &children)
.await
.unwrap();
// publish nexus on local node over nvmf
nexus_lookup_mut(&name)
.unwrap()
.share(Protocol::Nvmf, None)
.await
.unwrap();
assert_eq!(
nexus_pause_state(&name),
Some(NexusPauseState::Unpaused)
);
})
.await;

// This will lead into a child retire, which means the nexus will be faulted
// and subsystem frozen!
test.restart("ms1").await.unwrap();
wait_nexus_faulted(&nexus_name, std::time::Duration::from_secs(2))
.await
.unwrap();

let name = nexus_name.clone();
mayastor
.spawn(async move {
assert_eq!(nexus_pause_state(&name), Some(NexusPauseState::Frozen));

nexus_lookup_mut(&name).unwrap().pause().await.unwrap();
assert_eq!(nexus_pause_state(&name), Some(NexusPauseState::Frozen));

nexus_lookup_mut(&name).unwrap().resume().await.unwrap();
assert_eq!(nexus_pause_state(&name), Some(NexusPauseState::Frozen));

nexus_lookup_mut(&name).unwrap().destroy().await.unwrap();
})
.await;

create_pool_replicas(&test, 0).await;

let name = nexus_name.clone();
let children = nexus_children.clone();
mayastor
.spawn(async move {
nexus_create(&name, 32 * 1024 * 1024, Some(UUID), &children)
.await
.unwrap();
nexus_lookup_mut(&name)
.unwrap()
.share(Protocol::Nvmf, None)
.await
.unwrap();

// Pause, so now WE must be the ones which resume to frozen!
nexus_lookup_mut(&name).unwrap().pause().await.unwrap();
assert_eq!(nexus_pause_state(&name), Some(NexusPauseState::Paused));
})
.await;

test.restart("ms1").await.unwrap();
wait_nexus_faulted(&nexus_name, std::time::Duration::from_secs(2))
.await
.unwrap();

let name = nexus_name.clone();
mayastor
.spawn(async move {
nexus_lookup_mut(&name).unwrap().pause().await.unwrap();
assert_eq!(nexus_pause_state(&name), Some(NexusPauseState::Paused));

nexus_lookup_mut(&name).unwrap().resume().await.unwrap();
assert_eq!(nexus_pause_state(&name), Some(NexusPauseState::Paused));

// Final resume, transition to Frozen!
nexus_lookup_mut(&name).unwrap().resume().await.unwrap();
assert_eq!(nexus_pause_state(&name), Some(NexusPauseState::Frozen));

nexus_lookup_mut(&name).unwrap().destroy().await.unwrap();
})
.await;
}

fn nexus_pause_state(name: &str) -> Option<NexusPauseState> {
nexus_lookup(name).unwrap().io_subsystem_state()
}

async fn create_pool_replicas(test: &ComposeTest, index: usize) {
let mut hdls = test.grpc_handles().await.unwrap();
let hdl = &mut hdls[index];

// create a pool on remote node
hdl.mayastor
.create_pool(CreatePoolRequest {
name: POOL_NAME.to_string(),
disks: vec!["malloc:///disk0?size_mb=128".into()],
})
.await
.unwrap();

// create replica, shared over nvmf
hdl.mayastor
.create_replica(CreateReplicaRequest {
uuid: UUID.to_string(),
pool: POOL_NAME.to_string(),
size: 32 * 1024 * 1024,
thin: false,
share: 1,
})
.await
.unwrap();

// create replica, shared over nvmf
hdl.mayastor
.create_replica(CreateReplicaRequest {
uuid: UUID2.to_string(),
pool: POOL_NAME.to_string(),
size: 32 * 1024 * 1024,
thin: false,
share: 1,
})
.await
.unwrap();
}

async fn wait_nexus_faulted(
name: &str,
timeout: std::time::Duration,
) -> Result<(), std::time::Duration> {
let mayastor = get_ms();
let start = std::time::Instant::now();

while start.elapsed() <= timeout {
let name = name.to_string();
let faulted = mayastor
.spawn(async move {
nexus_lookup(&name).unwrap().status() == NexusStatus::Faulted
})
.await;
if faulted {
return Ok(());
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}

Err(start.elapsed())
}