diff --git a/control-plane/agents/src/bin/core/controller/reconciler/node/nexus.rs b/control-plane/agents/src/bin/core/controller/reconciler/node/nexus.rs index 57cabcb78..be81b5fd6 100644 --- a/control-plane/agents/src/bin/core/controller/reconciler/node/nexus.rs +++ b/control-plane/agents/src/bin/core/controller/reconciler/node/nexus.rs @@ -140,7 +140,7 @@ async fn find_shutdown_volumes( // else the drain operation is timed out // todo: this should be handled better.. - if platform::current_plaform_type() == platform::PlatformType::Deployer { + if platform::current_platform_type() == platform::PlatformType::Deployer { for vi in node_spec.node_draining_volumes().await { let mut volume = context.specs().volume(&vi).await?; let request = DestroyShutdownTargets::new(vi, None); diff --git a/control-plane/agents/src/bin/ha/node/path_provider.rs b/control-plane/agents/src/bin/ha/node/path_provider.rs index 5b23426aa..26c3d0092 100755 --- a/control-plane/agents/src/bin/ha/node/path_provider.rs +++ b/control-plane/agents/src/bin/ha/node/path_provider.rs @@ -1,5 +1,5 @@ use nvmeadm::nvmf_subsystem::{NvmeSubsystems, Subsystem}; -use stor_port::platform::{current_plaform_type, PlatformType}; +use stor_port::platform::{current_platform_type, PlatformType}; use utils::NVME_TARGET_NQN_PREFIX; #[cfg(target_os = "linux")] @@ -121,7 +121,7 @@ impl CachedNvmePathProvider { } #[cfg(target_os = "linux")] fn udev_supported() -> bool { - match current_plaform_type() { + match current_platform_type() { PlatformType::K8s => true, PlatformType::None => true, PlatformType::Deployer => false, diff --git a/control-plane/csi-driver/src/bin/controller/client.rs b/control-plane/csi-driver/src/bin/controller/client.rs index 15f266b73..fe31c3a78 100644 --- a/control-plane/csi-driver/src/bin/controller/client.rs +++ b/control-plane/csi-driver/src/bin/controller/client.rs @@ -148,6 +148,12 @@ impl IoEngineApiClient { } } +/// Token used to list volumes with pagination. +pub(crate) enum ListToken { + String(String), + Number(isize), +} + impl IoEngineApiClient { /// List all nodes available in IoEngine cluster. pub(crate) async fn list_nodes(&self) -> Result, ApiClientError> { @@ -178,17 +184,17 @@ impl IoEngineApiClient { pub(crate) async fn list_volumes( &self, max_entries: i32, - starting_token: String, + starting_token: ListToken, ) -> Result { let max_entries = max_entries as isize; - let starting_token = if starting_token.is_empty() { - 0 - } else { - starting_token.parse::().map_err(|_| { + let starting_token = match starting_token { + ListToken::String(starting_token) if starting_token.is_empty() => 0, + ListToken::String(starting_token) => starting_token.parse::().map_err(|_| { ApiClientError::InvalidArgument( "Failed to parse starting token as an isize".to_string(), ) - })? + })?, + ListToken::Number(starting_token) => starting_token, }; let response = self diff --git a/control-plane/csi-driver/src/bin/controller/controller.rs b/control-plane/csi-driver/src/bin/controller/controller.rs index af79cde11..7964ef8dd 100644 --- a/control-plane/csi-driver/src/bin/controller/controller.rs +++ b/control-plane/csi-driver/src/bin/controller/controller.rs @@ -1,4 +1,6 @@ -use crate::{ApiClientError, CreateVolumeTopology, CsiControllerConfig, IoEngineApiClient}; +use crate::{ + client::ListToken, ApiClientError, CreateVolumeTopology, CsiControllerConfig, IoEngineApiClient, +}; use csi_driver::context::{CreateParams, PublishParams}; use rpc::csi::{volume_content_source::Type, Topology as CsiTopology, *}; @@ -652,7 +654,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { let vt_mapper = VolumeTopologyMapper::init().await?; let volumes = IoEngineApiClient::get_client() - .list_volumes(max_entries, args.starting_token) + .list_volumes(max_entries, ListToken::String(args.starting_token)) .await .map_err(|e| Status::internal(format!("Failed to list volumes, error = {e:?}")))?; diff --git a/control-plane/csi-driver/src/bin/controller/main.rs b/control-plane/csi-driver/src/bin/controller/main.rs index b3567d6a9..56ab2c732 100644 --- a/control-plane/csi-driver/src/bin/controller/main.rs +++ b/control-plane/csi-driver/src/bin/controller/main.rs @@ -19,10 +19,26 @@ const REST_TIMEOUT: &str = "30s"; fn initialize_controller(args: &ArgMatches) -> anyhow::Result<()> { CsiControllerConfig::initialize(args)?; IoEngineApiClient::initialize() - .map_err(|e| anyhow::anyhow!("Failed to initialize API client, error = {}", e))?; + .map_err(|error| anyhow::anyhow!("Failed to initialize API client, error = {error}"))?; Ok(()) } +#[tracing::instrument] +async fn ping_rest_api() { + info!("Checking REST API endpoint accessibility ..."); + + match IoEngineApiClient::get_client().list_nodes().await { + Err(error) => tracing::error!(?error, "REST API endpoint is not accessible"), + Ok(nodes) => { + let names: Vec = nodes.into_iter().map(|n| n.id).collect(); + info!( + "REST API endpoint available, {len} IoEngine node(s) reported: {names:?}", + len = names.len(), + ); + } + } +} + #[tokio::main(worker_threads = 2)] async fn main() -> anyhow::Result<()> { let args = clap::Command::new(utils::package_description!()) @@ -33,7 +49,7 @@ async fn main() -> anyhow::Result<()> { .short('r') .env("ENDPOINT") .default_value("http://ksnode-1:30011") - .help("a URL endpoint to the control plane's rest endpoint"), + .help("A URL endpoint to the control plane's rest endpoint"), ) .arg( Arg::new("socket") @@ -41,14 +57,14 @@ async fn main() -> anyhow::Result<()> { .short('c') .env("CSI_SOCKET") .default_value(CSI_SOCKET) - .help("CSI socket path"), + .help("The CSI socket path"), ) .arg( Arg::new("jaeger") .short('j') .long("jaeger") .env("JAEGER_ENDPOINT") - .help("enable open telemetry and forward to jaeger"), + .help("Enable open telemetry and forward to jaeger"), ) .arg( Arg::new("timeout") @@ -78,6 +94,15 @@ async fn main() -> anyhow::Result<()> { "The number of worker threads that process requests" ), ) + .arg( + Arg::new("orphan-vol-gc-period") + .long("orphan-vol-gc-period") + .default_value("10m") + .help( + "How often to check and delete orphaned volumes. \n\ + An orphan volume is a volume with no corresponding PV", + ) + ) .get_matches(); utils::print_package_info!(); @@ -91,6 +116,13 @@ async fn main() -> anyhow::Result<()> { tags, args.get_one::("jaeger").cloned(), ); + let orphan_period = args + .get_one::("orphan-vol-gc-period") + .map(|p| p.parse::()) + .transpose()?; + let csi_socket = args + .get_one::("socket") + .expect("CSI socket must be specified"); initialize_controller(&args)?; @@ -99,18 +131,16 @@ async fn main() -> anyhow::Result<()> { CsiControllerConfig::get_config().rest_endpoint() ); + // Try to detect REST API endpoint to debug the accessibility status. + ping_rest_api().await; + // Starts PV Garbage Collector if platform type is k8s - if stor_port::platform::current_plaform_type() == stor_port::platform::PlatformType::K8s { - let gc_instance = pvwatcher::PvGarbageCollector::new().await?; + if stor_port::platform::current_platform_type() == stor_port::platform::PlatformType::K8s { + let gc_instance = pvwatcher::PvGarbageCollector::new(orphan_period).await?; tokio::spawn(async move { gc_instance.run_watcher().await }); } - let result = server::CsiServer::run( - args.get_one::("socket") - .expect("CSI socket must be specified") - .clone(), - ) - .await; + let result = server::CsiServer::run(csi_socket).await; utils::tracing_telemetry::flush_traces(); result } diff --git a/control-plane/csi-driver/src/bin/controller/pvwatcher.rs b/control-plane/csi-driver/src/bin/controller/pvwatcher.rs index 69e4140a5..8de1287b2 100644 --- a/control-plane/csi-driver/src/bin/controller/pvwatcher.rs +++ b/control-plane/csi-driver/src/bin/controller/pvwatcher.rs @@ -1,3 +1,4 @@ +use crate::client::ListToken; use futures::TryStreamExt; use k8s_openapi::api::core::v1::PersistentVolume; use kube::{ @@ -6,34 +7,30 @@ use kube::{ Client, ResourceExt, }; use tracing::{debug, error, info}; -use uuid::Uuid; - -use csi_driver::CSI_PLUGIN_NAME; - -use crate::IoEngineApiClient; /// Struct for PV Garbage collector #[derive(Clone)] pub(crate) struct PvGarbageCollector { pub(crate) pv_handle: Api, + orphan_period: Option, + rest_client: &'static crate::IoEngineApiClient, } /// Methods implemented by PV Garbage Collector impl PvGarbageCollector { /// Returns an instance of PV Garbage collector - pub(crate) async fn new() -> anyhow::Result { + pub(crate) async fn new(orphan_period: Option) -> anyhow::Result { let client = Client::try_default().await?; Ok(Self { pv_handle: Api::::all(client), + orphan_period, + rest_client: crate::IoEngineApiClient::get_client(), }) } - /// Starts watching PV events + /// Starts watching PV events. pub(crate) async fn run_watcher(&self) { + tokio::spawn(self.clone().orphan_volumes_watcher()); info!("Starting PV Garbage Collector"); - let cloned_self = self.clone(); - tokio::spawn(async move { - cloned_self.handle_missed_events().await; - }); watcher(self.pv_handle.clone(), watcher::Config::default()) .touched_objects() .try_for_each(|pvol| async { @@ -44,77 +41,111 @@ impl PvGarbageCollector { .expect("Watcher unexpectedly terminated"); } - async fn process_object(&self, pv: PersistentVolume) { - if pv.metadata.clone().deletion_timestamp.is_none() { - return; - } - if let Some(provisioner) = &pv.spec.as_ref().unwrap().csi { - if provisioner.driver != CSI_PLUGIN_NAME { - return; - } + async fn process_object(&self, pv: PersistentVolume) -> Option<()> { + let pv_name = pv.name_any(); + + pv.metadata.deletion_timestamp?; + + let pv_spec = pv.spec?; + let volume = pv_spec.csi?; + if volume.driver != csi_driver::CSI_PLUGIN_NAME { + return None; } + let volume_uuid = uuid::Uuid::parse_str(&volume.volume_handle).ok()?; + let reclaim_policy = pv_spec.persistent_volume_reclaim_policy?; + let phase = pv.status?.phase?; - if let Some(reclaim_policy) = &pv.spec.as_ref().unwrap().persistent_volume_reclaim_policy { - if let Some(phase) = &pv.status.as_ref().unwrap().phase { - if reclaim_policy == "Retain" && phase == "Released" { - debug!(pv.name = pv.name_any(), "PV is a deletion candidate"); - if let Some(provisioner) = &pv.spec.as_ref().unwrap().csi { - delete_volume(&provisioner.volume_handle.to_string()).await; - } - } - if phase == "Bound" { - match self.pv_handle.get_opt(&pv.name_any()).await { - Ok(pvol) => match pvol { - Some(_) => debug!(pv.name = pv.name_any(), "PV present on API server"), - None => { - debug!(pv.name = pv.name_any(), "PV is a deletion candidate"); - if let Some(provisioner) = &pv.spec.as_ref().unwrap().csi { - delete_volume(&provisioner.volume_handle.to_string()).await; - } - } - }, - Err(error) => error!(%error, "Error while verifying if PV is present"), + if reclaim_policy == "Retain" && phase == "Released" { + info!( + pv.name = pv_name, + pv.reclaim_policy = reclaim_policy, + pv.phase = phase, + "PV is a deletion candidate" + ); + self.delete_volume(volume_uuid).await; + } else if phase == "Bound" { + match self.pv_handle.get_opt(&pv_name).await { + Ok(pvol) => match pvol { + Some(_) => debug!(pv.name = pv_name, "PV present on API server"), + None => { + info!( + pv.name = pv_name, + pv.reclaim_policy = reclaim_policy, + pv.phase = phase, + "PV is a deletion candidate" + ); + self.delete_volume(volume_uuid).await; } - } + }, + Err(error) => error!(%error, "Error while verifying if PV is present"), } } + + Some(()) + } + + async fn orphan_volumes_watcher(self) { + info!("Starting Orphaned Volumes Garbage Collector"); + let Some(period) = self.orphan_period else { + return self.delete_orphan_volumes().await; + }; + let mut ticker = tokio::time::interval(period.into()); + loop { + ticker.tick().await; + self.delete_orphan_volumes().await; + } } - /// Handle if there is any missed events at startup. - async fn handle_missed_events(&self) { - debug!("Handling if any missed events"); - match IoEngineApiClient::get_client() - .list_volumes(0, "".to_string()) - .await - { - Ok(volume_list) => { - for vol in volume_list.entries { - let pv = "pvc-".to_string() + &vol.spec.uuid.to_string(); - if let Ok(pvol) = self.pv_handle.get_opt(&pv).await { - match pvol { - Some(_) => {} - None => { - debug!(pv.name = pv, "PV is a deletion candidate"); - let vol_handle = &vol.spec.uuid.to_string(); - delete_volume(vol_handle).await; - } + /// Deletes orphaned volumes (ie volumes with no corresponding PV) which can be useful: + /// 1. if there is any missed events at startup + /// 2. to tackle k8s bug where volumes are leaked when PV deletion is attempted before + /// PVC deletion. + async fn delete_orphan_volumes(&self) { + let max_entries = 200; + let mut starting_token = Some(0); + while let Some(token) = starting_token { + match self + .rest_client + .list_volumes(max_entries, ListToken::Number(token)) + .await + { + Ok(volumes) => { + starting_token = volumes.next_token; + for vol in volumes.entries { + if self.is_vol_orphaned(&vol.spec.uuid).await { + self.delete_volume(vol.spec.uuid).await; } } } + Err(error) => { + error!(?error, "Unable to list volumes"); + return; + } } - Err(error) => error!(?error, "Unable to list volumes"), } } -} -/// Accepts volume id and calls Control plane api to delete the Volume -async fn delete_volume(vol_handle: &str) { - let volume_uuid = Uuid::parse_str(vol_handle).unwrap(); - match IoEngineApiClient::get_client() - .delete_volume(&volume_uuid) - .await - { - Ok(_) => info!(volume.uuid = %volume_uuid, "Successfully deleted volume"), - Err(error) => error!(?error, volume.uuid = %volume_uuid, "Failed to delete volume"), + async fn is_vol_orphaned(&self, volume_uuid: &uuid::Uuid) -> bool { + let pv_name = format!("pvc-{volume_uuid}"); + if let Ok(None) = self.pv_handle.get_opt(&pv_name).await { + debug!(pv.name = pv_name, "PV is a deletion candidate"); + true + } else { + false + } + } + + /// Accepts volume id and calls Control plane api to delete the Volume. + #[tracing::instrument(level = "info", skip(self, volume_uuid), fields(volume.uuid = %volume_uuid))] + async fn delete_volume(&self, volume_uuid: uuid::Uuid) { + // this shouldn't happy but to be safe, ensure we don't bump heads with the provisioning + let Ok(_guard) = csi_driver::limiter::VolumeOpGuard::new(volume_uuid) else { + error!("Volume cannot be deleted as it's in use within the csi-controller plugin"); + return; + }; + match self.rest_client.delete_volume(&volume_uuid).await { + Ok(_) => info!("Successfully deleted the volume"), + Err(error) => error!(?error, "Failed to delete the volume"), + } } } diff --git a/control-plane/csi-driver/src/bin/controller/server.rs b/control-plane/csi-driver/src/bin/controller/server.rs index cad892fe1..351504f4c 100644 --- a/control-plane/csi-driver/src/bin/controller/server.rs +++ b/control-plane/csi-driver/src/bin/controller/server.rs @@ -1,4 +1,3 @@ -use crate::IoEngineApiClient; use futures::TryFutureExt; use tokio::{ io::{AsyncRead, AsyncWrite, ReadBuf}, @@ -34,10 +33,12 @@ impl Connected for UnixStream { } } +// Not sure why we need the inner fields, probably worth checking if we can remove them. #[derive(Clone, Debug)] -pub struct UdsConnectInfo { - pub peer_addr: Option>, - pub peer_cred: Option, +#[allow(unused)] +struct UdsConnectInfo { + peer_addr: Option>, + peer_cred: Option, } impl AsyncRead for UnixStream { @@ -68,33 +69,16 @@ impl AsyncWrite for UnixStream { } } -pub struct CsiServer {} - -#[tracing::instrument] -async fn ping_rest_api() { - info!("Checking REST API endpoint accessibility ..."); - - match IoEngineApiClient::get_client().list_nodes().await { - Err(e) => error!(?e, "REST API endpoint is not accessible"), - Ok(nodes) => { - let names: Vec = nodes.into_iter().map(|n| n.id).collect(); - info!( - "REST API endpoints available, {} IoEngine node(s) reported: {:?}", - names.len(), - names, - ); - } - } -} - +pub(super) struct CsiServer {} impl CsiServer { - pub async fn run(csi_socket: String) -> anyhow::Result<()> { + /// Runs the CSI Server identity and controller services. + pub async fn run(csi_socket: &str) -> anyhow::Result<()> { // It seems we're not ensuring only 1 csi server is running at a time because here // we don't bind to a port for example but to a unix socket. // todo: Can we do something about this? // Remove existing CSI socket from previous runs. - match fs::remove_file(&csi_socket) { + match fs::remove_file(csi_socket) { Ok(_) => info!("Removed stale CSI socket {}", csi_socket), Err(err) => { if err.kind() != ErrorKind::NotFound { @@ -108,12 +92,12 @@ impl CsiServer { debug!("CSI RPC server is listening on {}", csi_socket); let incoming = { - let uds = UnixListener::bind(&csi_socket)?; + let uds = UnixListener::bind(csi_socket)?; // Change permissions on CSI socket to allow non-privileged clients to access it // to simplify testing. if let Err(e) = fs::set_permissions( - &csi_socket, + csi_socket, std::os::unix::fs::PermissionsExt::from_mode(0o777), ) { error!("Failed to change permissions for CSI socket: {:?}", e); @@ -129,9 +113,6 @@ impl CsiServer { } }; - // Try to detect REST API endpoint to debug the accessibility status. - ping_rest_api().await; - let cfg = crate::CsiControllerConfig::get_config(); Server::builder() diff --git a/control-plane/csi-driver/src/bin/node/main_.rs b/control-plane/csi-driver/src/bin/node/main_.rs index a893db3e9..723aa8956 100644 --- a/control-plane/csi-driver/src/bin/node/main_.rs +++ b/control-plane/csi-driver/src/bin/node/main_.rs @@ -236,7 +236,7 @@ pub(super) async fn main() -> anyhow::Result<()> { anyhow::bail!("Failed to detect nvme_tcp kernel module. Run `modprobe nvme_tcp` to load the kernel module. {}", error); } - if platform::current_plaform_type() == platform::PlatformType::K8s { + if platform::current_platform_type() == platform::PlatformType::K8s { check_ana_and_label_node(matches.get_one::("node-name").expect("required")).await?; } diff --git a/utils/platform/src/lib.rs b/utils/platform/src/lib.rs index 4718b7dca..71724cb67 100644 --- a/utils/platform/src/lib.rs +++ b/utils/platform/src/lib.rs @@ -33,7 +33,7 @@ pub enum PlatformType { } /// Get the current `PlatformType`. -pub fn current_plaform_type() -> PlatformType { +pub fn current_platform_type() -> PlatformType { if std::env::var("KUBERNETES_SERVICE_HOST").is_ok() { PlatformType::K8s } else { @@ -51,7 +51,7 @@ static PLATFORM: OnceCell> = OnceCell::const_new(); pub async fn init_cluster_info() -> Result<&'static dyn PlatformInfo, PlatformError> { PLATFORM .get_or_try_init(|| async move { - Ok(match current_plaform_type() { + Ok(match current_platform_type() { PlatformType::K8s => Box::new(k8s::K8s::new().await?) as Box, PlatformType::Deployer => { Box::new(deployer::Deployer::new()) as Box