From ada7be5e15d3a09e3abc85eeee7b977dce583ded Mon Sep 17 00:00:00 2001 From: Tiago Castro Date: Tue, 23 Jan 2024 20:41:44 +0000 Subject: [PATCH 1/4] feat(csi-controller/orphaned-vols): gc orphaned volumes on a timer Due to a k8s bug, when pv deletion is attempted before pvc deletion, we may not receive the delete request, and thus leaking volumes, which cannot be deleted through any user facing api. Existing code to clean up orphaned volumes was being made use for the case when certain pv events might be missed. Therefore current WA is to delete the csi-controller pod. This change makes use of this existing logic but runs it on a given time period which is by default 5 minutes, which allows us to automatically WA the bug. Signed-off-by: Tiago Castro --- .../csi-driver/src/bin/controller/main.rs | 16 ++++++- .../src/bin/controller/pvwatcher.rs | 46 +++++++++++++------ 2 files changed, 46 insertions(+), 16 deletions(-) diff --git a/control-plane/csi-driver/src/bin/controller/main.rs b/control-plane/csi-driver/src/bin/controller/main.rs index b3567d6a9..bfcd0c919 100644 --- a/control-plane/csi-driver/src/bin/controller/main.rs +++ b/control-plane/csi-driver/src/bin/controller/main.rs @@ -78,6 +78,15 @@ async fn main() -> anyhow::Result<()> { "The number of worker threads that process requests" ), ) + .arg( + Arg::new("orphan-vol-gc-period") + .long("orphan-vol-gc-period") + .default_value("10m") + .help( + "How often to check and delete orphaned volumes. \n\ + An orphan volume is a volume with no corresponding PV", + ) + ) .get_matches(); utils::print_package_info!(); @@ -99,9 +108,14 @@ async fn main() -> anyhow::Result<()> { CsiControllerConfig::get_config().rest_endpoint() ); + let orphan_period = args + .get_one::("orphan-vol-gc-period") + .map(|p| p.parse::()) + .transpose()?; + // Starts PV Garbage Collector if platform type is k8s if stor_port::platform::current_plaform_type() == stor_port::platform::PlatformType::K8s { - let gc_instance = pvwatcher::PvGarbageCollector::new().await?; + let gc_instance = pvwatcher::PvGarbageCollector::new(orphan_period).await?; tokio::spawn(async move { gc_instance.run_watcher().await }); } diff --git a/control-plane/csi-driver/src/bin/controller/pvwatcher.rs b/control-plane/csi-driver/src/bin/controller/pvwatcher.rs index 69e4140a5..c617f4833 100644 --- a/control-plane/csi-driver/src/bin/controller/pvwatcher.rs +++ b/control-plane/csi-driver/src/bin/controller/pvwatcher.rs @@ -16,24 +16,24 @@ use crate::IoEngineApiClient; #[derive(Clone)] pub(crate) struct PvGarbageCollector { pub(crate) pv_handle: Api, + orphan_period: Option, } /// Methods implemented by PV Garbage Collector impl PvGarbageCollector { /// Returns an instance of PV Garbage collector - pub(crate) async fn new() -> anyhow::Result { + pub(crate) async fn new(orphan_period: Option) -> anyhow::Result { let client = Client::try_default().await?; Ok(Self { pv_handle: Api::::all(client), + orphan_period, }) } /// Starts watching PV events pub(crate) async fn run_watcher(&self) { info!("Starting PV Garbage Collector"); let cloned_self = self.clone(); - tokio::spawn(async move { - cloned_self.handle_missed_events().await; - }); + tokio::spawn(cloned_self.orphan_volumes_watcher()); watcher(self.pv_handle.clone(), watcher::Config::default()) .touched_objects() .try_for_each(|pvol| async { @@ -80,36 +80,52 @@ impl PvGarbageCollector { } } + async fn orphan_volumes_watcher(self) { + let Some(period) = self.orphan_period else { + return self.handle_missed_events().await; + }; + let mut ticker = tokio::time::interval(period.into()); + loop { + ticker.tick().await; + self.handle_missed_events().await; + } + } + /// Handle if there is any missed events at startup. async fn handle_missed_events(&self) { - debug!("Handling if any missed events"); match IoEngineApiClient::get_client() .list_volumes(0, "".to_string()) .await { Ok(volume_list) => { for vol in volume_list.entries { - let pv = "pvc-".to_string() + &vol.spec.uuid.to_string(); - if let Ok(pvol) = self.pv_handle.get_opt(&pv).await { - match pvol { - Some(_) => {} - None => { - debug!(pv.name = pv, "PV is a deletion candidate"); - let vol_handle = &vol.spec.uuid.to_string(); - delete_volume(vol_handle).await; - } - } + if self.is_vol_orphaned(&vol.spec.uuid).await { + delete_volume(&vol.spec.uuid.to_string()).await; } } } Err(error) => error!(?error, "Unable to list volumes"), } } + + async fn is_vol_orphaned(&self, volume_uuid: &uuid::Uuid) -> bool { + let pv_name = format!("pvc-{volume_uuid}"); + if let Ok(None) = self.pv_handle.get_opt(&pv_name).await { + debug!(pv.name = pv_name, "PV is a deletion candidate"); + true + } else { + false + } + } } /// Accepts volume id and calls Control plane api to delete the Volume async fn delete_volume(vol_handle: &str) { let volume_uuid = Uuid::parse_str(vol_handle).unwrap(); + // this shouldn't happy but to be safe, ensure we don't bump heads with the provisioning + let Ok(_guard) = csi_driver::limiter::VolumeOpGuard::new(volume_uuid) else { + return; + }; match IoEngineApiClient::get_client() .delete_volume(&volume_uuid) .await From 8af25cf1c6e154e394213d94cf9d88dd73183eb8 Mon Sep 17 00:00:00 2001 From: Tiago Castro Date: Tue, 23 Jan 2024 23:19:40 +0000 Subject: [PATCH 2/4] refactor(csi-controller/gc): reduce excessive unwrap and nesting Signed-off-by: Tiago Castro --- .../csi-driver/src/bin/controller/main.rs | 43 ++++-- .../src/bin/controller/pvwatcher.rs | 122 +++++++++--------- .../csi-driver/src/bin/controller/server.rs | 40 ++---- 3 files changed, 102 insertions(+), 103 deletions(-) diff --git a/control-plane/csi-driver/src/bin/controller/main.rs b/control-plane/csi-driver/src/bin/controller/main.rs index bfcd0c919..e89a8ee05 100644 --- a/control-plane/csi-driver/src/bin/controller/main.rs +++ b/control-plane/csi-driver/src/bin/controller/main.rs @@ -23,6 +23,23 @@ fn initialize_controller(args: &ArgMatches) -> anyhow::Result<()> { Ok(()) } +#[tracing::instrument] +async fn ping_rest_api() { + info!("Checking REST API endpoint accessibility ..."); + + match IoEngineApiClient::get_client().list_nodes().await { + Err(error) => tracing::error!(?error, "REST API endpoint is not accessible"), + Ok(nodes) => { + let names: Vec = nodes.into_iter().map(|n| n.id).collect(); + info!( + "REST API endpoints available, {} IoEngine node(s) reported: {:?}", + names.len(), + names, + ); + } + } +} + #[tokio::main(worker_threads = 2)] async fn main() -> anyhow::Result<()> { let args = clap::Command::new(utils::package_description!()) @@ -33,7 +50,7 @@ async fn main() -> anyhow::Result<()> { .short('r') .env("ENDPOINT") .default_value("http://ksnode-1:30011") - .help("a URL endpoint to the control plane's rest endpoint"), + .help("A URL endpoint to the control plane's rest endpoint"), ) .arg( Arg::new("socket") @@ -41,14 +58,14 @@ async fn main() -> anyhow::Result<()> { .short('c') .env("CSI_SOCKET") .default_value(CSI_SOCKET) - .help("CSI socket path"), + .help("The CSI socket path"), ) .arg( Arg::new("jaeger") .short('j') .long("jaeger") .env("JAEGER_ENDPOINT") - .help("enable open telemetry and forward to jaeger"), + .help("Enable open telemetry and forward to jaeger"), ) .arg( Arg::new("timeout") @@ -100,6 +117,13 @@ async fn main() -> anyhow::Result<()> { tags, args.get_one::("jaeger").cloned(), ); + let orphan_period = args + .get_one::("orphan-vol-gc-period") + .map(|p| p.parse::()) + .transpose()?; + let csi_socket = args + .get_one::("socket") + .expect("CSI socket must be specified"); initialize_controller(&args)?; @@ -108,10 +132,8 @@ async fn main() -> anyhow::Result<()> { CsiControllerConfig::get_config().rest_endpoint() ); - let orphan_period = args - .get_one::("orphan-vol-gc-period") - .map(|p| p.parse::()) - .transpose()?; + // Try to detect REST API endpoint to debug the accessibility status. + ping_rest_api().await; // Starts PV Garbage Collector if platform type is k8s if stor_port::platform::current_plaform_type() == stor_port::platform::PlatformType::K8s { @@ -119,12 +141,7 @@ async fn main() -> anyhow::Result<()> { tokio::spawn(async move { gc_instance.run_watcher().await }); } - let result = server::CsiServer::run( - args.get_one::("socket") - .expect("CSI socket must be specified") - .clone(), - ) - .await; + let result = server::CsiServer::run(csi_socket).await; utils::tracing_telemetry::flush_traces(); result } diff --git a/control-plane/csi-driver/src/bin/controller/pvwatcher.rs b/control-plane/csi-driver/src/bin/controller/pvwatcher.rs index c617f4833..5d84620ba 100644 --- a/control-plane/csi-driver/src/bin/controller/pvwatcher.rs +++ b/control-plane/csi-driver/src/bin/controller/pvwatcher.rs @@ -6,17 +6,13 @@ use kube::{ Client, ResourceExt, }; use tracing::{debug, error, info}; -use uuid::Uuid; - -use csi_driver::CSI_PLUGIN_NAME; - -use crate::IoEngineApiClient; /// Struct for PV Garbage collector #[derive(Clone)] pub(crate) struct PvGarbageCollector { pub(crate) pv_handle: Api, orphan_period: Option, + rest_client: &'static crate::IoEngineApiClient, } /// Methods implemented by PV Garbage Collector @@ -27,13 +23,13 @@ impl PvGarbageCollector { Ok(Self { pv_handle: Api::::all(client), orphan_period, + rest_client: crate::IoEngineApiClient::get_client(), }) } - /// Starts watching PV events + /// Starts watching PV events. pub(crate) async fn run_watcher(&self) { + tokio::spawn(self.clone().orphan_volumes_watcher()); info!("Starting PV Garbage Collector"); - let cloned_self = self.clone(); - tokio::spawn(cloned_self.orphan_volumes_watcher()); watcher(self.pv_handle.clone(), watcher::Config::default()) .touched_objects() .try_for_each(|pvol| async { @@ -44,63 +40,71 @@ impl PvGarbageCollector { .expect("Watcher unexpectedly terminated"); } - async fn process_object(&self, pv: PersistentVolume) { - if pv.metadata.clone().deletion_timestamp.is_none() { - return; - } - if let Some(provisioner) = &pv.spec.as_ref().unwrap().csi { - if provisioner.driver != CSI_PLUGIN_NAME { - return; - } + async fn process_object(&self, pv: PersistentVolume) -> Option<()> { + let pv_name = pv.name_any(); + + pv.metadata.deletion_timestamp?; + + let pv_spec = pv.spec?; + let volume = pv_spec.csi?; + if volume.driver != csi_driver::CSI_PLUGIN_NAME { + return None; } + let volume_uuid = uuid::Uuid::parse_str(&volume.volume_handle).ok()?; + let reclaim_policy = pv_spec.persistent_volume_reclaim_policy?; + let phase = pv.status?.phase?; - if let Some(reclaim_policy) = &pv.spec.as_ref().unwrap().persistent_volume_reclaim_policy { - if let Some(phase) = &pv.status.as_ref().unwrap().phase { - if reclaim_policy == "Retain" && phase == "Released" { - debug!(pv.name = pv.name_any(), "PV is a deletion candidate"); - if let Some(provisioner) = &pv.spec.as_ref().unwrap().csi { - delete_volume(&provisioner.volume_handle.to_string()).await; - } - } - if phase == "Bound" { - match self.pv_handle.get_opt(&pv.name_any()).await { - Ok(pvol) => match pvol { - Some(_) => debug!(pv.name = pv.name_any(), "PV present on API server"), - None => { - debug!(pv.name = pv.name_any(), "PV is a deletion candidate"); - if let Some(provisioner) = &pv.spec.as_ref().unwrap().csi { - delete_volume(&provisioner.volume_handle.to_string()).await; - } - } - }, - Err(error) => error!(%error, "Error while verifying if PV is present"), + if reclaim_policy == "Retain" && phase == "Released" { + info!( + pv.name = pv_name, + pv.reclaim_policy = reclaim_policy, + pv.phase = phase, + "PV is a deletion candidate" + ); + self.delete_volume(volume_uuid).await; + } else if phase == "Bound" { + match self.pv_handle.get_opt(&pv_name).await { + Ok(pvol) => match pvol { + Some(_) => debug!(pv.name = pv_name, "PV present on API server"), + None => { + info!( + pv.name = pv_name, + pv.reclaim_policy = reclaim_policy, + pv.phase = phase, + "PV is a deletion candidate" + ); + self.delete_volume(volume_uuid).await; } - } + }, + Err(error) => error!(%error, "Error while verifying if PV is present"), } } + + Some(()) } async fn orphan_volumes_watcher(self) { + info!("Starting Orphaned Volumes Garbage Collector"); let Some(period) = self.orphan_period else { - return self.handle_missed_events().await; + return self.delete_orphan_volumes().await; }; let mut ticker = tokio::time::interval(period.into()); loop { ticker.tick().await; - self.handle_missed_events().await; + self.delete_orphan_volumes().await; } } - /// Handle if there is any missed events at startup. - async fn handle_missed_events(&self) { - match IoEngineApiClient::get_client() - .list_volumes(0, "".to_string()) - .await - { + /// Deletes orphaned volumes (ie volumes with no corresponding PV) which can be useful: + /// 1. if there is any missed events at startup + /// 2. to tackle k8s bug where volumes are leaked when PV deletion is attempted before + /// PVC deletion. + async fn delete_orphan_volumes(&self) { + match self.rest_client.list_volumes(0, "".to_string()).await { Ok(volume_list) => { for vol in volume_list.entries { if self.is_vol_orphaned(&vol.spec.uuid).await { - delete_volume(&vol.spec.uuid.to_string()).await; + self.delete_volume(vol.spec.uuid).await; } } } @@ -117,20 +121,18 @@ impl PvGarbageCollector { false } } -} -/// Accepts volume id and calls Control plane api to delete the Volume -async fn delete_volume(vol_handle: &str) { - let volume_uuid = Uuid::parse_str(vol_handle).unwrap(); - // this shouldn't happy but to be safe, ensure we don't bump heads with the provisioning - let Ok(_guard) = csi_driver::limiter::VolumeOpGuard::new(volume_uuid) else { - return; - }; - match IoEngineApiClient::get_client() - .delete_volume(&volume_uuid) - .await - { - Ok(_) => info!(volume.uuid = %volume_uuid, "Successfully deleted volume"), - Err(error) => error!(?error, volume.uuid = %volume_uuid, "Failed to delete volume"), + /// Accepts volume id and calls Control plane api to delete the Volume. + #[tracing::instrument(level = "info", skip(self, volume_uuid), fields(volume.uuid = %volume_uuid))] + async fn delete_volume(&self, volume_uuid: uuid::Uuid) { + // this shouldn't happy but to be safe, ensure we don't bump heads with the provisioning + let Ok(_guard) = csi_driver::limiter::VolumeOpGuard::new(volume_uuid) else { + error!("Volume cannot be deleted as it's in use within the csi-controller plugin"); + return; + }; + match self.rest_client.delete_volume(&volume_uuid).await { + Ok(_) => info!("Successfully deleted the volume"), + Err(error) => error!(?error, "Failed to delete the volume"), + } } } diff --git a/control-plane/csi-driver/src/bin/controller/server.rs b/control-plane/csi-driver/src/bin/controller/server.rs index cad892fe1..70f67904b 100644 --- a/control-plane/csi-driver/src/bin/controller/server.rs +++ b/control-plane/csi-driver/src/bin/controller/server.rs @@ -1,4 +1,3 @@ -use crate::IoEngineApiClient; use futures::TryFutureExt; use tokio::{ io::{AsyncRead, AsyncWrite, ReadBuf}, @@ -35,9 +34,10 @@ impl Connected for UnixStream { } #[derive(Clone, Debug)] -pub struct UdsConnectInfo { - pub peer_addr: Option>, - pub peer_cred: Option, +#[allow(unused)] +struct UdsConnectInfo { + peer_addr: Option>, + peer_cred: Option, } impl AsyncRead for UnixStream { @@ -68,33 +68,16 @@ impl AsyncWrite for UnixStream { } } -pub struct CsiServer {} - -#[tracing::instrument] -async fn ping_rest_api() { - info!("Checking REST API endpoint accessibility ..."); - - match IoEngineApiClient::get_client().list_nodes().await { - Err(e) => error!(?e, "REST API endpoint is not accessible"), - Ok(nodes) => { - let names: Vec = nodes.into_iter().map(|n| n.id).collect(); - info!( - "REST API endpoints available, {} IoEngine node(s) reported: {:?}", - names.len(), - names, - ); - } - } -} - +pub(super) struct CsiServer {} impl CsiServer { - pub async fn run(csi_socket: String) -> anyhow::Result<()> { + /// Runs the CSI Server identity and controller services. + pub async fn run(csi_socket: &str) -> anyhow::Result<()> { // It seems we're not ensuring only 1 csi server is running at a time because here // we don't bind to a port for example but to a unix socket. // todo: Can we do something about this? // Remove existing CSI socket from previous runs. - match fs::remove_file(&csi_socket) { + match fs::remove_file(csi_socket) { Ok(_) => info!("Removed stale CSI socket {}", csi_socket), Err(err) => { if err.kind() != ErrorKind::NotFound { @@ -108,12 +91,12 @@ impl CsiServer { debug!("CSI RPC server is listening on {}", csi_socket); let incoming = { - let uds = UnixListener::bind(&csi_socket)?; + let uds = UnixListener::bind(csi_socket)?; // Change permissions on CSI socket to allow non-privileged clients to access it // to simplify testing. if let Err(e) = fs::set_permissions( - &csi_socket, + csi_socket, std::os::unix::fs::PermissionsExt::from_mode(0o777), ) { error!("Failed to change permissions for CSI socket: {:?}", e); @@ -129,9 +112,6 @@ impl CsiServer { } }; - // Try to detect REST API endpoint to debug the accessibility status. - ping_rest_api().await; - let cfg = crate::CsiControllerConfig::get_config(); Server::builder() From 4576447cd34a65681ec5c7296a9b83875864e41e Mon Sep 17 00:00:00 2001 From: Tiago Castro Date: Tue, 23 Jan 2024 23:38:24 +0000 Subject: [PATCH 3/4] chore: correct typo in the plaftform code Signed-off-by: Tiago Castro --- .../src/bin/core/controller/reconciler/node/nexus.rs | 2 +- control-plane/agents/src/bin/ha/node/path_provider.rs | 4 ++-- control-plane/csi-driver/src/bin/controller/main.rs | 9 ++++----- control-plane/csi-driver/src/bin/node/main_.rs | 2 +- utils/platform/src/lib.rs | 4 ++-- 5 files changed, 10 insertions(+), 11 deletions(-) diff --git a/control-plane/agents/src/bin/core/controller/reconciler/node/nexus.rs b/control-plane/agents/src/bin/core/controller/reconciler/node/nexus.rs index 57cabcb78..be81b5fd6 100644 --- a/control-plane/agents/src/bin/core/controller/reconciler/node/nexus.rs +++ b/control-plane/agents/src/bin/core/controller/reconciler/node/nexus.rs @@ -140,7 +140,7 @@ async fn find_shutdown_volumes( // else the drain operation is timed out // todo: this should be handled better.. - if platform::current_plaform_type() == platform::PlatformType::Deployer { + if platform::current_platform_type() == platform::PlatformType::Deployer { for vi in node_spec.node_draining_volumes().await { let mut volume = context.specs().volume(&vi).await?; let request = DestroyShutdownTargets::new(vi, None); diff --git a/control-plane/agents/src/bin/ha/node/path_provider.rs b/control-plane/agents/src/bin/ha/node/path_provider.rs index 5b23426aa..26c3d0092 100755 --- a/control-plane/agents/src/bin/ha/node/path_provider.rs +++ b/control-plane/agents/src/bin/ha/node/path_provider.rs @@ -1,5 +1,5 @@ use nvmeadm::nvmf_subsystem::{NvmeSubsystems, Subsystem}; -use stor_port::platform::{current_plaform_type, PlatformType}; +use stor_port::platform::{current_platform_type, PlatformType}; use utils::NVME_TARGET_NQN_PREFIX; #[cfg(target_os = "linux")] @@ -121,7 +121,7 @@ impl CachedNvmePathProvider { } #[cfg(target_os = "linux")] fn udev_supported() -> bool { - match current_plaform_type() { + match current_platform_type() { PlatformType::K8s => true, PlatformType::None => true, PlatformType::Deployer => false, diff --git a/control-plane/csi-driver/src/bin/controller/main.rs b/control-plane/csi-driver/src/bin/controller/main.rs index e89a8ee05..56ab2c732 100644 --- a/control-plane/csi-driver/src/bin/controller/main.rs +++ b/control-plane/csi-driver/src/bin/controller/main.rs @@ -19,7 +19,7 @@ const REST_TIMEOUT: &str = "30s"; fn initialize_controller(args: &ArgMatches) -> anyhow::Result<()> { CsiControllerConfig::initialize(args)?; IoEngineApiClient::initialize() - .map_err(|e| anyhow::anyhow!("Failed to initialize API client, error = {}", e))?; + .map_err(|error| anyhow::anyhow!("Failed to initialize API client, error = {error}"))?; Ok(()) } @@ -32,9 +32,8 @@ async fn ping_rest_api() { Ok(nodes) => { let names: Vec = nodes.into_iter().map(|n| n.id).collect(); info!( - "REST API endpoints available, {} IoEngine node(s) reported: {:?}", - names.len(), - names, + "REST API endpoint available, {len} IoEngine node(s) reported: {names:?}", + len = names.len(), ); } } @@ -136,7 +135,7 @@ async fn main() -> anyhow::Result<()> { ping_rest_api().await; // Starts PV Garbage Collector if platform type is k8s - if stor_port::platform::current_plaform_type() == stor_port::platform::PlatformType::K8s { + if stor_port::platform::current_platform_type() == stor_port::platform::PlatformType::K8s { let gc_instance = pvwatcher::PvGarbageCollector::new(orphan_period).await?; tokio::spawn(async move { gc_instance.run_watcher().await }); } diff --git a/control-plane/csi-driver/src/bin/node/main_.rs b/control-plane/csi-driver/src/bin/node/main_.rs index a893db3e9..723aa8956 100644 --- a/control-plane/csi-driver/src/bin/node/main_.rs +++ b/control-plane/csi-driver/src/bin/node/main_.rs @@ -236,7 +236,7 @@ pub(super) async fn main() -> anyhow::Result<()> { anyhow::bail!("Failed to detect nvme_tcp kernel module. Run `modprobe nvme_tcp` to load the kernel module. {}", error); } - if platform::current_plaform_type() == platform::PlatformType::K8s { + if platform::current_platform_type() == platform::PlatformType::K8s { check_ana_and_label_node(matches.get_one::("node-name").expect("required")).await?; } diff --git a/utils/platform/src/lib.rs b/utils/platform/src/lib.rs index 4718b7dca..71724cb67 100644 --- a/utils/platform/src/lib.rs +++ b/utils/platform/src/lib.rs @@ -33,7 +33,7 @@ pub enum PlatformType { } /// Get the current `PlatformType`. -pub fn current_plaform_type() -> PlatformType { +pub fn current_platform_type() -> PlatformType { if std::env::var("KUBERNETES_SERVICE_HOST").is_ok() { PlatformType::K8s } else { @@ -51,7 +51,7 @@ static PLATFORM: OnceCell> = OnceCell::const_new(); pub async fn init_cluster_info() -> Result<&'static dyn PlatformInfo, PlatformError> { PLATFORM .get_or_try_init(|| async move { - Ok(match current_plaform_type() { + Ok(match current_platform_type() { PlatformType::K8s => Box::new(k8s::K8s::new().await?) as Box, PlatformType::Deployer => { Box::new(deployer::Deployer::new()) as Box From e4a5f564dc6f8462d52edb8681740183b511b75a Mon Sep 17 00:00:00 2001 From: Tiago Castro Date: Wed, 24 Jan 2024 00:16:18 +0000 Subject: [PATCH 4/4] fix(csi-controller/gc): list volumes with pagination Signed-off-by: Tiago Castro --- .../csi-driver/src/bin/controller/client.rs | 18 ++++++++----- .../src/bin/controller/controller.rs | 6 +++-- .../src/bin/controller/pvwatcher.rs | 25 ++++++++++++++----- .../csi-driver/src/bin/controller/server.rs | 1 + 4 files changed, 36 insertions(+), 14 deletions(-) diff --git a/control-plane/csi-driver/src/bin/controller/client.rs b/control-plane/csi-driver/src/bin/controller/client.rs index 15f266b73..fe31c3a78 100644 --- a/control-plane/csi-driver/src/bin/controller/client.rs +++ b/control-plane/csi-driver/src/bin/controller/client.rs @@ -148,6 +148,12 @@ impl IoEngineApiClient { } } +/// Token used to list volumes with pagination. +pub(crate) enum ListToken { + String(String), + Number(isize), +} + impl IoEngineApiClient { /// List all nodes available in IoEngine cluster. pub(crate) async fn list_nodes(&self) -> Result, ApiClientError> { @@ -178,17 +184,17 @@ impl IoEngineApiClient { pub(crate) async fn list_volumes( &self, max_entries: i32, - starting_token: String, + starting_token: ListToken, ) -> Result { let max_entries = max_entries as isize; - let starting_token = if starting_token.is_empty() { - 0 - } else { - starting_token.parse::().map_err(|_| { + let starting_token = match starting_token { + ListToken::String(starting_token) if starting_token.is_empty() => 0, + ListToken::String(starting_token) => starting_token.parse::().map_err(|_| { ApiClientError::InvalidArgument( "Failed to parse starting token as an isize".to_string(), ) - })? + })?, + ListToken::Number(starting_token) => starting_token, }; let response = self diff --git a/control-plane/csi-driver/src/bin/controller/controller.rs b/control-plane/csi-driver/src/bin/controller/controller.rs index af79cde11..7964ef8dd 100644 --- a/control-plane/csi-driver/src/bin/controller/controller.rs +++ b/control-plane/csi-driver/src/bin/controller/controller.rs @@ -1,4 +1,6 @@ -use crate::{ApiClientError, CreateVolumeTopology, CsiControllerConfig, IoEngineApiClient}; +use crate::{ + client::ListToken, ApiClientError, CreateVolumeTopology, CsiControllerConfig, IoEngineApiClient, +}; use csi_driver::context::{CreateParams, PublishParams}; use rpc::csi::{volume_content_source::Type, Topology as CsiTopology, *}; @@ -652,7 +654,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { let vt_mapper = VolumeTopologyMapper::init().await?; let volumes = IoEngineApiClient::get_client() - .list_volumes(max_entries, args.starting_token) + .list_volumes(max_entries, ListToken::String(args.starting_token)) .await .map_err(|e| Status::internal(format!("Failed to list volumes, error = {e:?}")))?; diff --git a/control-plane/csi-driver/src/bin/controller/pvwatcher.rs b/control-plane/csi-driver/src/bin/controller/pvwatcher.rs index 5d84620ba..8de1287b2 100644 --- a/control-plane/csi-driver/src/bin/controller/pvwatcher.rs +++ b/control-plane/csi-driver/src/bin/controller/pvwatcher.rs @@ -1,3 +1,4 @@ +use crate::client::ListToken; use futures::TryStreamExt; use k8s_openapi::api::core::v1::PersistentVolume; use kube::{ @@ -100,15 +101,27 @@ impl PvGarbageCollector { /// 2. to tackle k8s bug where volumes are leaked when PV deletion is attempted before /// PVC deletion. async fn delete_orphan_volumes(&self) { - match self.rest_client.list_volumes(0, "".to_string()).await { - Ok(volume_list) => { - for vol in volume_list.entries { - if self.is_vol_orphaned(&vol.spec.uuid).await { - self.delete_volume(vol.spec.uuid).await; + let max_entries = 200; + let mut starting_token = Some(0); + while let Some(token) = starting_token { + match self + .rest_client + .list_volumes(max_entries, ListToken::Number(token)) + .await + { + Ok(volumes) => { + starting_token = volumes.next_token; + for vol in volumes.entries { + if self.is_vol_orphaned(&vol.spec.uuid).await { + self.delete_volume(vol.spec.uuid).await; + } } } + Err(error) => { + error!(?error, "Unable to list volumes"); + return; + } } - Err(error) => error!(?error, "Unable to list volumes"), } } diff --git a/control-plane/csi-driver/src/bin/controller/server.rs b/control-plane/csi-driver/src/bin/controller/server.rs index 70f67904b..351504f4c 100644 --- a/control-plane/csi-driver/src/bin/controller/server.rs +++ b/control-plane/csi-driver/src/bin/controller/server.rs @@ -33,6 +33,7 @@ impl Connected for UnixStream { } } +// Not sure why we need the inner fields, probably worth checking if we can remove them. #[derive(Clone, Debug)] #[allow(unused)] struct UdsConnectInfo {