diff --git a/io-engine/src/bdev/nexus/nexus_bdev.rs b/io-engine/src/bdev/nexus/nexus_bdev.rs index a85baac81..22ea50318 100644 --- a/io-engine/src/bdev/nexus/nexus_bdev.rs +++ b/io-engine/src/bdev/nexus/nexus_bdev.rs @@ -1114,6 +1114,18 @@ impl<'n> Nexus<'n> { Ok(()) } + /// Aborts all frozen IOs of a shutdown Nexus. + /// # Warning + /// These aborts may translate into I/O errors for the initiator. + pub async fn abort_shutdown_frozen_ios(&self) { + if self.status() == NexusStatus::Shutdown { + self.traverse_io_channels_async((), |channel, _| { + channel.abort_frozen(); + }) + .await; + } + } + /// Suspend any incoming IO to the bdev pausing the controller allows us to /// handle internal events and which is a protocol feature. /// In case concurrent pause requests take place, the other callers diff --git a/io-engine/src/bdev/nexus/nexus_channel.rs b/io-engine/src/bdev/nexus/nexus_channel.rs index 2cc82626f..8047a95af 100644 --- a/io-engine/src/bdev/nexus/nexus_channel.rs +++ b/io-engine/src/bdev/nexus/nexus_channel.rs @@ -32,7 +32,7 @@ impl<'n> Debug for NexusChannel<'n> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "{io} chan '{nex}' core:{core}({cur}) [R:{r} W:{w} D:{d} L:{l} C:{c}]", + "{io} chan '{nex}' core:{core}({cur}) [R:{r} W:{w} D:{d} L:{l} C:{c} F:{f}]", io = if self.is_io_chan { "I/O" } else { "Aux" }, nex = self.nexus.nexus_name(), core = self.core, @@ -42,6 +42,7 @@ impl<'n> Debug for NexusChannel<'n> { d = self.detached.len(), l = self.io_logs.len(), c = self.nexus.child_count(), + f = self.frozen_ios.len() ) } } diff --git a/io-engine/src/bdev/nexus/nexus_share.rs b/io-engine/src/bdev/nexus/nexus_share.rs index 114ce7346..137c414e7 100644 --- a/io-engine/src/bdev/nexus/nexus_share.rs +++ b/io-engine/src/bdev/nexus/nexus_share.rs @@ -1,11 +1,13 @@ -use crate::bdev::PtplFileOps; +use super::{nexus_err, nexus_lookup, Error, NbdDisk, Nexus, NexusTarget}; +use crate::{ + bdev::PtplFileOps, + core::{NvmfShareProps, Protocol, PtplProps, Reactors, Share, UpdateProps}, + sleep::mayastor_sleep, +}; use async_trait::async_trait; +use futures::{channel::oneshot, future::FusedFuture}; use snafu::ResultExt; -use std::pin::Pin; - -use super::{nexus_err, Error, NbdDisk, Nexus, NexusTarget}; - -use crate::core::{NvmfShareProps, Protocol, PtplProps, Share, UpdateProps}; +use std::{pin::Pin, time::Duration}; /// /// The sharing of the nexus is different compared to regular bdevs @@ -78,6 +80,31 @@ impl<'n> Share for Nexus<'n> { async fn unshare(mut self: Pin<&mut Self>) -> Result<(), Self::Error> { info!("{:?}: unsharing nexus bdev...", self); + // TODO: we should not allow new initiator connections for a shutdown + // nexus! + + // TODO: we may want to disable the freeze at this point instead, + // allowing new I/Os to fail "normally" + let name = self.name.clone(); + let (_s, r) = oneshot::channel::<()>(); + Reactors::master().send_future(async move { + for _ in 0 ..= 10 { + mayastor_sleep(Duration::from_secs(2)).await.ok(); + if r.is_terminated() { + // This means the unshare is complete, so nothing to do here + return; + } + // If we're not unshared, then abort any I/Os that may have + // reached + if let Some(nexus) = nexus_lookup(&name) { + nexus.abort_shutdown_frozen_ios().await; + } + } + }); + + // Aborts frozen I/Os a priori + self.abort_shutdown_frozen_ios().await; + let name = self.name.clone(); self.as_mut().pin_bdev_mut().unshare().await.context( nexus_err::UnshareNexus { diff --git a/io-engine/tests/persistence.rs b/io-engine/tests/persistence.rs index abf9d6486..9a91d710f 100644 --- a/io-engine/tests/persistence.rs +++ b/io-engine/tests/persistence.rs @@ -1,4 +1,4 @@ -use crate::common::fio_run_verify; +use crate::common::{dd_random_file, fio_run_verify}; use common::compose::{ rpc::v0::{ mayastor::{ @@ -347,11 +347,80 @@ async fn persistent_store_connection() { assert!(get_nexus(ms1, nexus_uuid).await.is_some()); } +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn self_shutdown_destroy() { + let test = start_infrastructure_("self_shutdown_destroy", Some("1")).await; + let grpc = GrpcConnect::new(&test); + let ms1 = &mut grpc.grpc_handle("ms1").await.unwrap(); + let ms2 = &mut grpc.grpc_handle("ms2").await.unwrap(); + let ms3 = &mut grpc.grpc_handle("ms3").await.unwrap(); + + // Create bdevs and share over nvmf. + let child1 = create_and_share_bdevs(ms2, CHILD1_UUID).await; + let child2 = create_and_share_bdevs(ms3, CHILD2_UUID).await; + + // Create and publish a nexus. + let nexus_uuid = "8272e9d3-3738-4e33-b8c3-769d8eed5771"; + create_nexus(ms1, nexus_uuid, vec![child1.clone(), child2.clone()]).await; + let nexus_uri = publish_nexus(ms1, nexus_uuid).await; + + // Create and connect NVMF target. + let target = libnvme_rs::NvmeTarget::try_from(nexus_uri.clone()) + .unwrap() + .with_reconnect_delay(Some(1)) + .ctrl_loss_timeout(Some(1)) + .with_rand_hostnqn(true); + target.connect().unwrap(); + + // simulate node with child and etcd going down + test.stop("etcd").await.unwrap(); + test.stop("ms3").await.unwrap(); + + // allow pstor to timeout and self shutdown + // todo: use wait loop + tokio::time::sleep(Duration::from_secs(2)).await; + + let nexus = get_nexus(ms1, nexus_uuid).await.unwrap(); + assert_eq!(nexus.state, NexusState::NexusShutdown as i32); + + let devices = target.block_devices(2).unwrap(); + let fio_hdl = tokio::spawn(async move { + dd_random_file(&devices[0].to_string(), 4096, 1) + }); + + test.start("etcd").await.unwrap(); + + ms1.mayastor + .destroy_nexus(DestroyNexusRequest { + uuid: nexus_uuid.to_string(), + }) + .await + .expect("Failed to destroy nexus"); + + // Disconnect NVMF target + target.disconnect().unwrap(); + + fio_hdl.await.unwrap(); +} + /// Start the containers for the tests. async fn start_infrastructure(test_name: &str) -> ComposeTest { + start_infrastructure_(test_name, None).await +} + +/// Start the containers for the tests. +async fn start_infrastructure_( + test_name: &str, + ps_retries: Option<&str>, +) -> ComposeTest { common::composer_init(); let etcd_endpoint = format!("http://etcd.{test_name}:2379"); + let mut args = vec!["-p", &etcd_endpoint]; + if let Some(retries) = ps_retries { + args.extend(["--ps-retries", retries]); + } + Builder::new() .name(test_name) .add_container_spec( @@ -371,20 +440,17 @@ async fn start_infrastructure(test_name: &str) -> ComposeTest { ) .add_container_bin( "ms1", - Binary::from_dbg("io-engine").with_args(vec!["-p", &etcd_endpoint]), + Binary::from_dbg("io-engine").with_args(args.clone()), ) .add_container_bin( "ms2", - Binary::from_dbg("io-engine").with_args(vec!["-p", &etcd_endpoint]), + Binary::from_dbg("io-engine").with_args(args.clone()), ) .add_container_bin( "ms3", - Binary::from_dbg("io-engine").with_args(vec!["-p", &etcd_endpoint]), - ) - .add_container_bin( - "ms4", - Binary::from_dbg("io-engine").with_args(vec!["-p", &etcd_endpoint]), + Binary::from_dbg("io-engine").with_args(args.clone()), ) + .add_container_bin("ms4", Binary::from_dbg("io-engine").with_args(args)) .build() .await .unwrap() diff --git a/libnvme-rs/src/nvme_uri.rs b/libnvme-rs/src/nvme_uri.rs index 328b0e74d..6739decd8 100644 --- a/libnvme-rs/src/nvme_uri.rs +++ b/libnvme-rs/src/nvme_uri.rs @@ -64,6 +64,10 @@ pub struct NvmeTarget { trtype: NvmeTransportType, /// Auto-Generate random HostNqn. hostnqn_autogen: bool, + /// The Reconnect Delay. + reconnect_delay: Option, + /// The Controller Loss Timeout. + ctrl_loss_timeout: Option, } impl TryFrom for NvmeTarget { @@ -117,6 +121,8 @@ impl TryFrom<&str> for NvmeTarget { trsvcid: url.port().unwrap_or(4420), subsysnqn: subnqn, hostnqn_autogen: false, + reconnect_delay: None, + ctrl_loss_timeout: None, }) } } @@ -128,6 +134,16 @@ impl NvmeTarget { self.hostnqn_autogen = random; self } + /// With the reconnect delay. + pub fn with_reconnect_delay(mut self, delay: Option) -> Self { + self.reconnect_delay = delay; + self + } + /// With the ctrl loss timeout. + pub fn ctrl_loss_timeout(mut self, timeout: Option) -> Self { + self.ctrl_loss_timeout = timeout; + self + } /// Connect to NVMe target /// Returns Ok on successful connect pub fn connect(&self) -> Result<(), NvmeError> { @@ -184,8 +200,11 @@ impl NvmeTarget { host_iface, queue_size: 0, nr_io_queues: 0, - reconnect_delay: 0, - ctrl_loss_tmo: crate::NVMF_DEF_CTRL_LOSS_TMO as i32, + reconnect_delay: self.reconnect_delay.unwrap_or(0) as i32, + ctrl_loss_tmo: self + .ctrl_loss_timeout + .unwrap_or(crate::NVMF_DEF_CTRL_LOSS_TMO) + as i32, fast_io_fail_tmo: 0, keep_alive_tmo: 0, nr_write_queues: 0,