From 3ba1ffde8e25a667bedb293cce0529d24d7a829f Mon Sep 17 00:00:00 2001 From: Abhinandan Purkait Date: Fri, 9 Feb 2024 12:24:26 +0000 Subject: [PATCH] refactor: use pagination to load values from pstor Signed-off-by: Abhinandan Purkait --- .../src/bin/core/controller/registry.rs | 12 ++- .../core/controller/resources/migration.rs | 5 +- .../controller/resources/operations_helper.rs | 19 ++-- control-plane/agents/src/bin/core/main.rs | 7 +- .../src/bin/core/tests/controller/mod.rs | 92 ++++++++++++++++++- .../src/types/v0/store/nexus_persistence.rs | 23 ++--- utils/pstor/src/api.rs | 8 ++ utils/pstor/src/error.rs | 2 + utils/pstor/src/etcd.rs | 68 ++++++++++++-- utils/pstor/src/products/v1.rs | 2 +- utils/utils-lib/src/constants.rs | 3 + 11 files changed, 203 insertions(+), 38 deletions(-) diff --git a/control-plane/agents/src/bin/core/controller/registry.rs b/control-plane/agents/src/bin/core/controller/registry.rs index f775a6e1b..b00cef9e8 100644 --- a/control-plane/agents/src/bin/core/controller/registry.rs +++ b/control-plane/agents/src/bin/core/controller/registry.rs @@ -100,6 +100,8 @@ pub(crate) struct RegistryInner { thin_args: ThinArgs, /// Check if the HA feature is enabled. ha_disabled: bool, + /// Etcd max page size. + etcd_max_page_size: i64, } impl Registry { @@ -121,6 +123,7 @@ impl Registry { host_acl: Vec, thin_args: ThinArgs, ha_enabled: bool, + etcd_max_page_size: i64, ) -> Result { let store_endpoint = Self::format_store_endpoint(&store_url); tracing::info!("Connecting to persistent store at {}", store_endpoint); @@ -171,6 +174,7 @@ impl Registry { legacy_prefix_present, thin_args, ha_disabled: ha_enabled, + etcd_max_page_size, }), }; registry.init().await?; @@ -178,7 +182,7 @@ impl Registry { // Disable v1 compat if nexus_info keys are migrated. if registry.config().mayastor_compat_v1() && registry.nexus_info_v1_migrated().await? { // Delete the v1 nexus_info keys by brute force. - delete_all_v1_nexus_info(&mut store) + delete_all_v1_nexus_info(&mut store, etcd_max_page_size) .await .map_err(|error| StoreError::Generic { source: Box::new(error), @@ -406,7 +410,11 @@ impl Registry { async fn init(&self) -> Result<(), SvcError> { let mut store = self.store.lock().await; self.specs - .init(store.deref_mut(), self.legacy_prefix_present) + .init( + store.deref_mut(), + self.legacy_prefix_present, + self.etcd_max_page_size, + ) .await?; Ok(()) } diff --git a/control-plane/agents/src/bin/core/controller/resources/migration.rs b/control-plane/agents/src/bin/core/controller/resources/migration.rs index d26eda1fd..0d2e785b5 100644 --- a/control-plane/agents/src/bin/core/controller/resources/migration.rs +++ b/control-plane/agents/src/bin/core/controller/resources/migration.rs @@ -16,10 +16,13 @@ pub const MSP_OPERATOR: &str = "msp-operator"; pub(crate) async fn migrate_product_v1_to_v2( store: &mut S, spec_type: StorableObjectType, + etcd_max_page_size: i64, ) -> Result<(), StoreError> { info!("Migrating {spec_type:?} from v1 to v2 key space"); let prefix = &product_v1_key_prefix_obj(spec_type); - let store_entries = store.get_values_prefix(prefix).await?; + let store_entries = store + .get_values_paged_all(prefix, etcd_max_page_size) + .await?; for (k, v) in store_entries { let id = k .split('/') diff --git a/control-plane/agents/src/bin/core/controller/resources/operations_helper.rs b/control-plane/agents/src/bin/core/controller/resources/operations_helper.rs index 900cd2225..2fdd7a0d6 100644 --- a/control-plane/agents/src/bin/core/controller/resources/operations_helper.rs +++ b/control-plane/agents/src/bin/core/controller/resources/operations_helper.rs @@ -905,6 +905,7 @@ impl ResourceSpecsLocked { &self, store: &mut S, legacy_prefix_present: bool, + etcd_max_page_size: i64, ) -> Result<(), SvcError> { let spec_types = [ StorableObjectType::VolumeSpec, @@ -916,7 +917,7 @@ impl ResourceSpecsLocked { StorableObjectType::AppNodeSpec, ]; for spec in &spec_types { - self.populate_specs(store, *spec, legacy_prefix_present) + self.populate_specs(store, *spec, legacy_prefix_present, etcd_max_page_size) .await .map_err(|error| SvcError::Internal { details: error.full_string(), @@ -993,22 +994,22 @@ impl ResourceSpecsLocked { store: &mut S, spec_type: StorableObjectType, legacy_prefix_present: bool, + etcd_max_page_size: i64, ) -> Result<(), SpecError> { if legacy_prefix_present { - migrate_product_v1_to_v2(store, spec_type) + migrate_product_v1_to_v2(store, spec_type, etcd_max_page_size) .await .map_err(|e| SpecError::StoreMigrate { source: Box::new(e), })?; } let prefix = key_prefix_obj(spec_type, API_VERSION); - let store_entries = - store - .get_values_prefix(&prefix) - .await - .map_err(|e| SpecError::StoreGet { - source: Box::new(e), - })?; + let store_entries = store + .get_values_paged_all(&prefix, etcd_max_page_size) + .await + .map_err(|e| SpecError::StoreGet { + source: Box::new(e), + })?; let store_values = store_entries.iter().map(|e| e.1.clone()).collect(); let mut resource_specs = self.0.write(); diff --git a/control-plane/agents/src/bin/core/main.rs b/control-plane/agents/src/bin/core/main.rs index dadadf8af..51a71208c 100644 --- a/control-plane/agents/src/bin/core/main.rs +++ b/control-plane/agents/src/bin/core/main.rs @@ -20,7 +20,7 @@ pub(crate) mod watch; use clap::Parser; use controller::registry::NumRebuilds; use std::{net::SocketAddr, num::ParseIntError}; -use utils::{version_info_str, DEFAULT_GRPC_SERVER_ADDR}; +use utils::{version_info_str, DEFAULT_GRPC_SERVER_ADDR, ETCD_MAX_PAGE_LIMIT}; use stor_port::HostAccessControl; use utils::tracing_telemetry::{trace::TracerProvider, KeyValue}; @@ -110,6 +110,10 @@ pub(crate) struct CliArgs { /// This is useful when the frontend nodes do not support the NVMe ANA feature. #[clap(long, env = "HA_DISABLED")] pub(crate) disable_ha: bool, + + /// Etcd Pagination Limit. + #[clap(long, default_value = ETCD_MAX_PAGE_LIMIT)] + pub(crate) etcd_page_limit: u32, } impl CliArgs { fn args() -> Self { @@ -180,6 +184,7 @@ async fn server(cli_args: CliArgs) -> anyhow::Result<()> { }, cli_args.thin_args, cli_args.disable_ha, + cli_args.etcd_page_limit as i64, ) .await?; diff --git a/control-plane/agents/src/bin/core/tests/controller/mod.rs b/control-plane/agents/src/bin/core/tests/controller/mod.rs index 6c63d278f..92f92e8f6 100644 --- a/control-plane/agents/src/bin/core/tests/controller/mod.rs +++ b/control-plane/agents/src/bin/core/tests/controller/mod.rs @@ -1,6 +1,6 @@ use deployer_cluster::{etcd_client::Client, *}; use stor_port::{ - pstor::{etcd::Etcd, StoreObj}, + pstor::{etcd::Etcd, key_prefix_obj, ApiVersion, StorableObjectType, StoreKv, StoreObj}, types::v0::{ openapi::models, store::registry::{ControlPlaneService, StoreLeaseOwner, StoreLeaseOwnerKey}, @@ -8,6 +8,10 @@ use stor_port::{ }, }; +use serde_json::Value; +use std::str::FromStr; +use uuid::Uuid; + /// Test that the content of the registry is correctly loaded from the persistent store on start up. #[tokio::test] async fn bootstrap_registry() { @@ -212,3 +216,89 @@ async fn core_agent_lease_lock() { tracing::info!("core: {:?}", core.state); assert_eq!(Some(false), core.state.unwrap().running); } + +const OLD_VOLUME_PREFIX: &str = "/namespace/default/control-plane/VolumeSpec"; + +#[tokio::test] +async fn etcd_pagination() { + let lease_ttl = std::time::Duration::from_secs(2); + let cluster = ClusterBuilder::builder() + .with_io_engines(0) + .with_rest(false) + .with_jaeger(false) + .with_store_lease_ttl(lease_ttl) + .build() + .await + .unwrap(); + + let mut etcd = Etcd::new("0.0.0.0:2379").await.unwrap(); + + let node_prefix = key_prefix_obj(StorableObjectType::NodeSpec, ApiVersion::V0); + let volume_prefix = key_prefix_obj(StorableObjectType::VolumeSpec, ApiVersion::V0); + + // Persist some nodes in etcd. + for i in 1 .. 11 { + let key = format!("{}/node{}", node_prefix, i); + let json_str = format!( + r#"{{"id":"mayastor-node{}","endpoint":"136.144.51.107:10124","labels":{{}}}}"#, + i + ); + let value = Value::from_str(&json_str).unwrap(); + etcd.put_kv(&key, &value).await.unwrap(); + } + + // Persist some volumes in new keyspace in etcd. + for _i in 1 .. 4 { + let uuid = Uuid::new_v4(); + let key = format!("{}/{}", volume_prefix, uuid); + let json_str = r#"{"uuid":"456122b1-7e19-4148-a890-579ca785a119","size":2147483648,"labels":{"local":"true"},"num_replicas":3,"status":{"Created":"Online"},"target":{"node":"mayastor-node4","nexus":"d6ccbb97-d13e-4ffb-91a0-c7607bb01f8f","protocol":"nvmf"},"policy":{"self_heal":true},"topology":{"node":{"Explicit":{"allowed_nodes":["mayastor-node2","mayastor-master","mayastor-node3","mayastor-node1","mayastor-node4"],"preferred_nodes":["mayastor-node2","mayastor-node3","mayastor-node4","mayastor-master","mayastor-node1"]}},"pool":{"Labelled":{"exclusion":{},"inclusion":{"openebs.io/created-by":"msp-operator"}}}},"last_nexus_id":"d6ccbb97-d13e-4ffb-91a0-c7607bb01f8f","operation":null}"#; + let value = Value::from_str(json_str).unwrap(); + etcd.put_kv(&key, &value).await.unwrap(); + } + + // Persist some volumes in old key space in etcd. + for _i in 1 .. 6 { + let uuid = Uuid::new_v4(); + let key = format!("{}/{}", OLD_VOLUME_PREFIX, uuid); + let json_str = r#"{"uuid":"456122b1-7e19-4148-a890-579ca785a119","size":2147483648,"labels":{"local":"true"},"num_replicas":3,"status":{"Created":"Online"},"target":{"node":"mayastor-node4","nexus":"d6ccbb97-d13e-4ffb-91a0-c7607bb01f8f","protocol":"nvmf"},"policy":{"self_heal":true},"topology":{"node":{"Explicit":{"allowed_nodes":["mayastor-node2","mayastor-master","mayastor-node3","mayastor-node1","mayastor-node4"],"preferred_nodes":["mayastor-node2","mayastor-node3","mayastor-node4","mayastor-master","mayastor-node1"]}},"pool":{"Labelled":{"exclusion":{},"inclusion":{"openebs.io/created-by":"msp-operator"}}}},"last_nexus_id":"d6ccbb97-d13e-4ffb-91a0-c7607bb01f8f","operation":null}"#; + let value = Value::from_str(json_str).unwrap(); + etcd.put_kv(&key, &value).await.unwrap(); + } + + // Persist some nexus info in old key space in etcd. + for _i in 1 .. 6 { + let uuid = Uuid::new_v4(); + let key = format!("{}", uuid); + let json_str = r#"{"children":[{"healthy":true,"uuid":"82779efa-a0c7-4652-a37b-83eefd894714"},{"healthy":true,"uuid":"2d98fa96-ac12-40be-acdc-e3559c0b1530"},{"healthy":true,"uuid":"620ff519-419a-48d6-97a8-c1ba3260d87e"}],"clean_shutdown":false}"#; + let value = Value::from_str(json_str).unwrap(); + etcd.put_kv(&key, &value).await.unwrap(); + } + + // There Should be exactly 10 Nodes. + let node_kvs = etcd.get_values_paged_all(&node_prefix, 3).await.unwrap(); + assert_eq!(node_kvs.len(), 10); + + // There Should be exactly 5 New Volumes. + let volume_kvs = etcd.get_values_paged_all(&volume_prefix, 3).await.unwrap(); + assert_eq!(volume_kvs.len(), 3); + + // There Should be exactly 5 Old Volumes. + let volume_kvs = etcd + .get_values_paged_all(OLD_VOLUME_PREFIX, 3) + .await + .unwrap(); + assert_eq!(volume_kvs.len(), 5); + + cluster.restart_core().await; + cluster + .volume_service_liveness(None) + .await + .expect("Should have restarted by now"); + + // There Should be exactly 10 New Volumes, after the migration. + let volume_kvs_all = etcd.get_values_paged_all(&volume_prefix, 3).await.unwrap(); + assert_eq!(volume_kvs_all.len(), 8); + + let all = etcd.get_values_paged_all("", 3).await.unwrap(); + assert_eq!(all.len(), 26); +} diff --git a/control-plane/stor-port/src/types/v0/store/nexus_persistence.rs b/control-plane/stor-port/src/types/v0/store/nexus_persistence.rs index 14e4ef247..5cba1ebf5 100644 --- a/control-plane/stor-port/src/types/v0/store/nexus_persistence.rs +++ b/control-plane/stor-port/src/types/v0/store/nexus_persistence.rs @@ -8,9 +8,6 @@ use std::fmt::Debug; use tracing::info; use uuid::Uuid; -/// ETCD Pagination limit. -const ETCD_PAGED_LIMIT: i64 = 1000; - /// Definition of the nexus information that gets saved in the persistent /// store. #[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)] @@ -139,17 +136,23 @@ impl StorableObject for NexusInfo { /// Deletes all v1 nexus_info by fetching all keys and parsing the key to UUID and deletes on /// success. -pub async fn delete_all_v1_nexus_info(store: &mut S) -> Result<(), StoreError> { - let mut prefix: &str = ""; +pub async fn delete_all_v1_nexus_info( + store: &mut S, + etcd_page_limit: i64, +) -> Result<(), StoreError> { let mut first = true; let mut kvs; - loop { - kvs = store.get_values_paged(prefix, ETCD_PAGED_LIMIT).await?; + let mut last = Some("".to_string()); + + while let Some(prefix) = &last { + kvs = store.get_values_paged(prefix, etcd_page_limit, "").await?; + if !first && kvs.get(0).is_some() { kvs.remove(0); } first = false; + last = kvs.last().map(|(key, _)| key.to_string()); // If the key is a uuid, i.e. nexus_info v1 key, and the value is a valid nexus_info then we // delete it. for (key, value) in &kvs { @@ -159,12 +162,6 @@ pub async fn delete_all_v1_nexus_info(store: &mut S) -> Result<(), Sto store.delete_kv(&key).await?; } } - - if let Some((key, _)) = kvs.last() { - prefix = key; - } else { - break; - } } info!("v1.0.x nexus_info cleaned up successfully"); Ok(()) diff --git a/utils/pstor/src/api.rs b/utils/pstor/src/api.rs index 35bb27e67..6758f00a3 100644 --- a/utils/pstor/src/api.rs +++ b/utils/pstor/src/api.rs @@ -33,6 +33,14 @@ pub trait StoreKv: Sync + Send + Clone { &mut self, key_prefix: &str, limit: i64, + range_end: &str, + ) -> Result, Error>; + /// Returns a vector of tuples. Each tuple represents a key-value pair. It paginates through all + /// the values for the prefix with limit. + async fn get_values_paged_all( + &mut self, + key_prefix: &str, + limit: i64, ) -> Result, Error>; /// Deletes all key values from a given prefix. async fn delete_values_prefix(&mut self, key_prefix: &str) -> Result<(), Error>; diff --git a/utils/pstor/src/error.rs b/utils/pstor/src/error.rs index 95efcd9e6..6b80586eb 100644 --- a/utils/pstor/src/error.rs +++ b/utils/pstor/src/error.rs @@ -86,4 +86,6 @@ pub enum Error { source: Box, description: String, }, + #[snafu(display("Failed to parse range end for start key: '{}'", start_key))] + RangeEnd { start_key: String }, } diff --git a/utils/pstor/src/etcd.rs b/utils/pstor/src/etcd.rs index 67bcf7b0e..1fa7be630 100644 --- a/utils/pstor/src/etcd.rs +++ b/utils/pstor/src/etcd.rs @@ -221,23 +221,27 @@ impl StoreKv for Etcd { &mut self, key_prefix: &str, limit: i64, + range_end: &str, ) -> Result, Error> { if limit <= 2 { return Err(Error::PagedMinimum); } + let get_options = if range_end.is_empty() { + GetOptions::new() + .with_from_key() + .with_sort(SortTarget::Key, SortOrder::Ascend) + .with_limit(limit) + } else { + GetOptions::new() + .with_range(range_end) + .with_sort(SortTarget::Key, SortOrder::Ascend) + .with_limit(limit) + }; + let resp = self .client - .get( - key_prefix, - Some( - GetOptions::new() - .with_prefix() - .with_from_key() - .with_sort(SortTarget::Key, SortOrder::Ascend) - .with_limit(limit), - ), - ) + .get(key_prefix, Some(get_options)) .await .context(GetPrefix { prefix: key_prefix })?; @@ -256,6 +260,37 @@ impl StoreKv for Etcd { Ok(result) } + /// Returns a vector of tuples. Each tuple represents a key-value pair. It paginates through all + /// the values for the prefix with limit. + async fn get_values_paged_all( + &mut self, + key_prefix: &str, + limit: i64, + ) -> Result, Error> { + let range_end = get_prefix_range_end(key_prefix).map_err(|_| Error::RangeEnd { + start_key: key_prefix.to_string(), + })?; + + let mut first = true; + let mut all_values = vec![]; + let mut values; + let mut last = Some(key_prefix.to_string()); + + while let Some(prefix) = &last { + values = self.get_values_paged(prefix, limit, &range_end).await?; + + if !first && values.get(0).is_some() { + values.remove(0); + } + first = false; + + last = values.last().map(|(key, _)| key.to_string()); + + all_values.extend(values); + } + Ok(all_values) + } + /// Deletes objects with the given key prefix. async fn delete_values_prefix(&mut self, key_prefix: &str) -> Result<(), Error> { if let Some((lease_id, lock_key)) = self.lease_lock()? { @@ -425,3 +460,16 @@ pub fn build_key_prefix(_platform: impl platform::PlatformInfo) -> String { //crate::types::v0::store::definitions::build_key_prefix(&platform, namespace) "".to_string() } + +/// Returns the range end key for the given start key. If the start key is "abc", the end key will +/// be "abd". It will return the start key if its empty. +pub fn get_prefix_range_end(prefix: &str) -> Result { + let mut end = prefix.as_bytes().to_vec(); + for byte in end.iter_mut().rev() { + if *byte < 0xff { + *byte += 1; + break; + } + } + String::from_utf8(end) +} diff --git a/utils/pstor/src/products/v1.rs b/utils/pstor/src/products/v1.rs index 2d5da5afb..f3dda12e4 100644 --- a/utils/pstor/src/products/v1.rs +++ b/utils/pstor/src/products/v1.rs @@ -23,6 +23,6 @@ pub fn key_prefix_obj>(key_type: K) -> String { /// Fetches the product v1 key prefix and returns true if entry is present. pub async fn detect_product_v1_prefix(store: &mut S) -> Result { - let prefix = store.get_values_prefix(&key_prefix()).await?; + let prefix = store.get_values_paged(&key_prefix(), 3, "").await?; Ok(!prefix.is_empty()) } diff --git a/utils/utils-lib/src/constants.rs b/utils/utils-lib/src/constants.rs index 5d0472fc3..00c105ae2 100644 --- a/utils/utils-lib/src/constants.rs +++ b/utils/utils-lib/src/constants.rs @@ -131,3 +131,6 @@ pub const SNAPSHOT_MAX_TRANSACTION_LIMIT: usize = 5; /// Label for the csi-node nvme ana multi-path. pub const CSI_NODE_NVME_ANA: &str = "openebs.io/csi-node.nvme-ana"; + +/// Max limit for etcd pagination. +pub const ETCD_MAX_PAGE_LIMIT: &str = "500";