From e77a8241ef45bff1c019031223c0f54a21d7bea7 Mon Sep 17 00:00:00 2001 From: Tiago Castro Date: Wed, 18 Dec 2024 23:40:41 +0000 Subject: [PATCH] fix(self-shutdown): abort frozen ios when unsharing shutdown nexus These frozen IOs prevent the nexus from shutting down. We don't have any hooks today to do this whilsts the target is stopping so we add a simple loop which tries a number of times. Ideally we should get some sort of callback to trigger this. Signed-off-by: Tiago Castro --- io-engine/src/bdev/nexus/nexus_bdev.rs | 12 ++++ io-engine/src/bdev/nexus/nexus_channel.rs | 3 +- io-engine/src/bdev/nexus/nexus_share.rs | 39 +++++++++-- io-engine/tests/persistence.rs | 82 ++++++++++++++++++++--- libnvme-rs/src/nvme_uri.rs | 23 ++++++- 5 files changed, 142 insertions(+), 17 deletions(-) diff --git a/io-engine/src/bdev/nexus/nexus_bdev.rs b/io-engine/src/bdev/nexus/nexus_bdev.rs index a85baac81..22ea50318 100644 --- a/io-engine/src/bdev/nexus/nexus_bdev.rs +++ b/io-engine/src/bdev/nexus/nexus_bdev.rs @@ -1114,6 +1114,18 @@ impl<'n> Nexus<'n> { Ok(()) } + /// Aborts all frozen IOs of a shutdown Nexus. + /// # Warning + /// These aborts may translate into I/O errors for the initiator. + pub async fn abort_shutdown_frozen_ios(&self) { + if self.status() == NexusStatus::Shutdown { + self.traverse_io_channels_async((), |channel, _| { + channel.abort_frozen(); + }) + .await; + } + } + /// Suspend any incoming IO to the bdev pausing the controller allows us to /// handle internal events and which is a protocol feature. /// In case concurrent pause requests take place, the other callers diff --git a/io-engine/src/bdev/nexus/nexus_channel.rs b/io-engine/src/bdev/nexus/nexus_channel.rs index 2cc82626f..8047a95af 100644 --- a/io-engine/src/bdev/nexus/nexus_channel.rs +++ b/io-engine/src/bdev/nexus/nexus_channel.rs @@ -32,7 +32,7 @@ impl<'n> Debug for NexusChannel<'n> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "{io} chan '{nex}' core:{core}({cur}) [R:{r} W:{w} D:{d} L:{l} C:{c}]", + "{io} chan '{nex}' core:{core}({cur}) [R:{r} W:{w} D:{d} L:{l} C:{c} F:{f}]", io = if self.is_io_chan { "I/O" } else { "Aux" }, nex = self.nexus.nexus_name(), core = self.core, @@ -42,6 +42,7 @@ impl<'n> Debug for NexusChannel<'n> { d = self.detached.len(), l = self.io_logs.len(), c = self.nexus.child_count(), + f = self.frozen_ios.len() ) } } diff --git a/io-engine/src/bdev/nexus/nexus_share.rs b/io-engine/src/bdev/nexus/nexus_share.rs index 114ce7346..137c414e7 100644 --- a/io-engine/src/bdev/nexus/nexus_share.rs +++ b/io-engine/src/bdev/nexus/nexus_share.rs @@ -1,11 +1,13 @@ -use crate::bdev::PtplFileOps; +use super::{nexus_err, nexus_lookup, Error, NbdDisk, Nexus, NexusTarget}; +use crate::{ + bdev::PtplFileOps, + core::{NvmfShareProps, Protocol, PtplProps, Reactors, Share, UpdateProps}, + sleep::mayastor_sleep, +}; use async_trait::async_trait; +use futures::{channel::oneshot, future::FusedFuture}; use snafu::ResultExt; -use std::pin::Pin; - -use super::{nexus_err, Error, NbdDisk, Nexus, NexusTarget}; - -use crate::core::{NvmfShareProps, Protocol, PtplProps, Share, UpdateProps}; +use std::{pin::Pin, time::Duration}; /// /// The sharing of the nexus is different compared to regular bdevs @@ -78,6 +80,31 @@ impl<'n> Share for Nexus<'n> { async fn unshare(mut self: Pin<&mut Self>) -> Result<(), Self::Error> { info!("{:?}: unsharing nexus bdev...", self); + // TODO: we should not allow new initiator connections for a shutdown + // nexus! + + // TODO: we may want to disable the freeze at this point instead, + // allowing new I/Os to fail "normally" + let name = self.name.clone(); + let (_s, r) = oneshot::channel::<()>(); + Reactors::master().send_future(async move { + for _ in 0 ..= 10 { + mayastor_sleep(Duration::from_secs(2)).await.ok(); + if r.is_terminated() { + // This means the unshare is complete, so nothing to do here + return; + } + // If we're not unshared, then abort any I/Os that may have + // reached + if let Some(nexus) = nexus_lookup(&name) { + nexus.abort_shutdown_frozen_ios().await; + } + } + }); + + // Aborts frozen I/Os a priori + self.abort_shutdown_frozen_ios().await; + let name = self.name.clone(); self.as_mut().pin_bdev_mut().unshare().await.context( nexus_err::UnshareNexus { diff --git a/io-engine/tests/persistence.rs b/io-engine/tests/persistence.rs index abf9d6486..9a91d710f 100644 --- a/io-engine/tests/persistence.rs +++ b/io-engine/tests/persistence.rs @@ -1,4 +1,4 @@ -use crate::common::fio_run_verify; +use crate::common::{dd_random_file, fio_run_verify}; use common::compose::{ rpc::v0::{ mayastor::{ @@ -347,11 +347,80 @@ async fn persistent_store_connection() { assert!(get_nexus(ms1, nexus_uuid).await.is_some()); } +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn self_shutdown_destroy() { + let test = start_infrastructure_("self_shutdown_destroy", Some("1")).await; + let grpc = GrpcConnect::new(&test); + let ms1 = &mut grpc.grpc_handle("ms1").await.unwrap(); + let ms2 = &mut grpc.grpc_handle("ms2").await.unwrap(); + let ms3 = &mut grpc.grpc_handle("ms3").await.unwrap(); + + // Create bdevs and share over nvmf. + let child1 = create_and_share_bdevs(ms2, CHILD1_UUID).await; + let child2 = create_and_share_bdevs(ms3, CHILD2_UUID).await; + + // Create and publish a nexus. + let nexus_uuid = "8272e9d3-3738-4e33-b8c3-769d8eed5771"; + create_nexus(ms1, nexus_uuid, vec![child1.clone(), child2.clone()]).await; + let nexus_uri = publish_nexus(ms1, nexus_uuid).await; + + // Create and connect NVMF target. + let target = libnvme_rs::NvmeTarget::try_from(nexus_uri.clone()) + .unwrap() + .with_reconnect_delay(Some(1)) + .ctrl_loss_timeout(Some(1)) + .with_rand_hostnqn(true); + target.connect().unwrap(); + + // simulate node with child and etcd going down + test.stop("etcd").await.unwrap(); + test.stop("ms3").await.unwrap(); + + // allow pstor to timeout and self shutdown + // todo: use wait loop + tokio::time::sleep(Duration::from_secs(2)).await; + + let nexus = get_nexus(ms1, nexus_uuid).await.unwrap(); + assert_eq!(nexus.state, NexusState::NexusShutdown as i32); + + let devices = target.block_devices(2).unwrap(); + let fio_hdl = tokio::spawn(async move { + dd_random_file(&devices[0].to_string(), 4096, 1) + }); + + test.start("etcd").await.unwrap(); + + ms1.mayastor + .destroy_nexus(DestroyNexusRequest { + uuid: nexus_uuid.to_string(), + }) + .await + .expect("Failed to destroy nexus"); + + // Disconnect NVMF target + target.disconnect().unwrap(); + + fio_hdl.await.unwrap(); +} + /// Start the containers for the tests. async fn start_infrastructure(test_name: &str) -> ComposeTest { + start_infrastructure_(test_name, None).await +} + +/// Start the containers for the tests. +async fn start_infrastructure_( + test_name: &str, + ps_retries: Option<&str>, +) -> ComposeTest { common::composer_init(); let etcd_endpoint = format!("http://etcd.{test_name}:2379"); + let mut args = vec!["-p", &etcd_endpoint]; + if let Some(retries) = ps_retries { + args.extend(["--ps-retries", retries]); + } + Builder::new() .name(test_name) .add_container_spec( @@ -371,20 +440,17 @@ async fn start_infrastructure(test_name: &str) -> ComposeTest { ) .add_container_bin( "ms1", - Binary::from_dbg("io-engine").with_args(vec!["-p", &etcd_endpoint]), + Binary::from_dbg("io-engine").with_args(args.clone()), ) .add_container_bin( "ms2", - Binary::from_dbg("io-engine").with_args(vec!["-p", &etcd_endpoint]), + Binary::from_dbg("io-engine").with_args(args.clone()), ) .add_container_bin( "ms3", - Binary::from_dbg("io-engine").with_args(vec!["-p", &etcd_endpoint]), - ) - .add_container_bin( - "ms4", - Binary::from_dbg("io-engine").with_args(vec!["-p", &etcd_endpoint]), + Binary::from_dbg("io-engine").with_args(args.clone()), ) + .add_container_bin("ms4", Binary::from_dbg("io-engine").with_args(args)) .build() .await .unwrap() diff --git a/libnvme-rs/src/nvme_uri.rs b/libnvme-rs/src/nvme_uri.rs index 328b0e74d..6739decd8 100644 --- a/libnvme-rs/src/nvme_uri.rs +++ b/libnvme-rs/src/nvme_uri.rs @@ -64,6 +64,10 @@ pub struct NvmeTarget { trtype: NvmeTransportType, /// Auto-Generate random HostNqn. hostnqn_autogen: bool, + /// The Reconnect Delay. + reconnect_delay: Option, + /// The Controller Loss Timeout. + ctrl_loss_timeout: Option, } impl TryFrom for NvmeTarget { @@ -117,6 +121,8 @@ impl TryFrom<&str> for NvmeTarget { trsvcid: url.port().unwrap_or(4420), subsysnqn: subnqn, hostnqn_autogen: false, + reconnect_delay: None, + ctrl_loss_timeout: None, }) } } @@ -128,6 +134,16 @@ impl NvmeTarget { self.hostnqn_autogen = random; self } + /// With the reconnect delay. + pub fn with_reconnect_delay(mut self, delay: Option) -> Self { + self.reconnect_delay = delay; + self + } + /// With the ctrl loss timeout. + pub fn ctrl_loss_timeout(mut self, timeout: Option) -> Self { + self.ctrl_loss_timeout = timeout; + self + } /// Connect to NVMe target /// Returns Ok on successful connect pub fn connect(&self) -> Result<(), NvmeError> { @@ -184,8 +200,11 @@ impl NvmeTarget { host_iface, queue_size: 0, nr_io_queues: 0, - reconnect_delay: 0, - ctrl_loss_tmo: crate::NVMF_DEF_CTRL_LOSS_TMO as i32, + reconnect_delay: self.reconnect_delay.unwrap_or(0) as i32, + ctrl_loss_tmo: self + .ctrl_loss_timeout + .unwrap_or(crate::NVMF_DEF_CTRL_LOSS_TMO) + as i32, fast_io_fail_tmo: 0, keep_alive_tmo: 0, nr_write_queues: 0,