diff --git a/composer/src/lib.rs b/composer/src/lib.rs index c424d8c0a..50f6ac59b 100644 --- a/composer/src/lib.rs +++ b/composer/src/lib.rs @@ -56,10 +56,10 @@ impl RpcHandle { } } - let mayastor = MayastorClient::connect(format!("http://{}", endpoint.to_string())) + let mayastor = MayastorClient::connect(format!("http://{}", endpoint)) .await .unwrap(); - let bdev = BdevRpcClient::connect(format!("http://{}", endpoint.to_string())) + let bdev = BdevRpcClient::connect(format!("http://{}", endpoint)) .await .unwrap(); diff --git a/control-plane/agents/common/src/errors.rs b/control-plane/agents/common/src/errors.rs index 5d8f58207..ade2ccbd5 100644 --- a/control-plane/agents/common/src/errors.rs +++ b/control-plane/agents/common/src/errors.rs @@ -192,6 +192,11 @@ pub enum SvcError { StoreMissingEntry { key: String }, #[snafu(display("The uuid '{}' for kind '{}' is not valid.", uuid, kind.to_string()))] InvalidUuid { uuid: String, kind: ResourceKind }, + #[snafu(display( + "Unable to start rebuild. Maximum number of rebuilds permitted is {}", + max_rebuilds + ))] + MaxRebuilds { max_rebuilds: u32 }, } impl From for SvcError { @@ -534,6 +539,12 @@ impl From for ReplyError { source: desc.to_string(), extra: error.full_string(), }, + SvcError::MaxRebuilds { .. } => ReplyError { + kind: ReplyErrorKind::ResourceExhausted, + resource: ResourceKind::Volume, + source: desc.to_string(), + extra: error.full_string(), + }, } } } diff --git a/control-plane/agents/core/src/core/registry.rs b/control-plane/agents/core/src/core/registry.rs index efd94d637..1baec183b 100644 --- a/control-plane/agents/core/src/core/registry.rs +++ b/control-plane/agents/core/src/core/registry.rs @@ -54,6 +54,9 @@ impl Deref for Registry { } } +/// Number of rebuilds +pub(crate) type NumRebuilds = u32; + /// Generic Registry Inner with a Store trait #[derive(Debug)] pub struct RegistryInner { @@ -72,6 +75,8 @@ pub struct RegistryInner { reconcile_period: std::time::Duration, reconciler: ReconcilerControl, config: CoreRegistryConfig, + /// system-wide maximum number of concurrent rebuilds allowed + max_rebuilds: Option, } impl Registry { @@ -85,6 +90,7 @@ impl Registry { store_lease_tll: std::time::Duration, reconcile_period: std::time::Duration, reconcile_idle_period: std::time::Duration, + max_rebuilds: Option, ) -> Self { let store_endpoint = Self::format_store_endpoint(&store_url); tracing::info!("Connecting to persistent store at {}", store_endpoint); @@ -107,6 +113,7 @@ impl Registry { reconcile_idle_period, reconciler: ReconcilerControl::new(), config: Self::get_config_or_panic(store).await, + max_rebuilds, }), }; registry.init().await; @@ -277,4 +284,25 @@ impl Registry { tokio::time::sleep(self.cache_period).await; } } + + /// Determine if a rebuild is allowed to start. + /// Constrain the number of system-wide rebuilds to the maximum specified. + /// If a maximum is not specified, do not limit the number of rebuilds. + pub(crate) async fn rebuild_allowed(&self) -> Result<(), SvcError> { + match self.max_rebuilds { + Some(max_rebuilds) => { + let mut num_rebuilds = 0; + for (_id, node_wrapper) in self.nodes.read().await.iter() { + num_rebuilds += node_wrapper.read().await.num_rebuilds(); + } + + if num_rebuilds < max_rebuilds { + Ok(()) + } else { + Err(SvcError::MaxRebuilds { max_rebuilds }) + } + } + None => Ok(()), + } + } } diff --git a/control-plane/agents/core/src/core/wrapper.rs b/control-plane/agents/core/src/core/wrapper.rs index a1f4e0c27..be6b77e47 100644 --- a/control-plane/agents/core/src/core/wrapper.rs +++ b/control-plane/agents/core/src/core/wrapper.rs @@ -5,6 +5,7 @@ use crate::{ states::{ResourceStates, ResourceStatesLocked}, }, node::service::NodeCommsTimeout, + NumRebuilds, }; use common::{ @@ -26,7 +27,8 @@ use common_lib::{ }; use async_trait::async_trait; -use common_lib::types::v0::store::ResourceUuid; +use common_lib::types::v0::{message_bus, store::ResourceUuid}; +use parking_lot::RwLock; use rpc::mayastor::Null; use snafu::ResultExt; use std::{ @@ -39,6 +41,13 @@ type NodeResourceStates = (Vec, Vec, Vec); /// Default timeout for GET* gRPC requests (ex: GetPools, GetNexuses, etc..) const GETS_TIMEOUT: MessageIdVs = MessageIdVs::Default; +enum ResourceType { + All(Vec, Vec, Vec), + Nexus(Vec), + Pool(Vec), + Replica(Vec), +} + /// Wrapper over a `Node` plus a few useful methods/properties. Includes: /// all pools and replicas from the node /// a watchdog to keep track of the node's liveness @@ -60,6 +69,8 @@ pub(crate) struct NodeWrapper { comms_timeouts: NodeCommsTimeout, /// runtime state information states: ResourceStatesLocked, + /// number of rebuilds in progress on the node + num_rebuilds: Arc>, } impl NodeWrapper { @@ -77,6 +88,7 @@ impl NodeWrapper { lock: Default::default(), comms_timeouts, states: ResourceStatesLocked::new(), + num_rebuilds: Arc::new(RwLock::new(0)), } } @@ -402,7 +414,7 @@ impl NodeWrapper { match fetch_result { Ok((replicas, pools, nexuses)) => { - self.resources_mut().update(pools, replicas, nexuses); + self.update_resources(ResourceType::All(pools, replicas, nexuses)); if setting_online { // we only set it as online after we've updated the resource states // so an online node should be "up-to-date" @@ -520,23 +532,59 @@ impl NodeWrapper { /// Update all the nexus states. async fn update_nexus_states(&self, client: &mut GrpcClient) -> Result<(), SvcError> { let nexuses = self.fetch_nexuses(client).await?; - self.resources_mut().update_nexuses(nexuses); + self.update_resources(ResourceType::Nexus(nexuses)); Ok(()) } /// Update all the pool states. async fn update_pool_states(&self, client: &mut GrpcClient) -> Result<(), SvcError> { let pools = self.fetch_pools(client).await?; - self.resources_mut().update_pools(pools); + self.update_resources(ResourceType::Pool(pools)); Ok(()) } /// Update all the replica states. async fn update_replica_states(&self, client: &mut GrpcClient) -> Result<(), SvcError> { let replicas = self.fetch_replicas(client).await?; - self.resources_mut().update_replicas(replicas); + self.update_resources(ResourceType::Replica(replicas)); Ok(()) } + + /// Update the states of the specified resource type. + /// Whenever the nexus states are updated the number of rebuilds must be updated. + fn update_resources(&self, resource_type: ResourceType) { + match resource_type { + ResourceType::All(pools, replicas, nexuses) => { + self.resources_mut().update(pools, replicas, nexuses); + self.update_num_rebuilds(); + } + ResourceType::Nexus(nexuses) => { + self.resources_mut().update_nexuses(nexuses); + self.update_num_rebuilds(); + } + ResourceType::Pool(pools) => { + self.resources_mut().update_pools(pools); + } + ResourceType::Replica(replicas) => { + self.resources_mut().update_replicas(replicas); + } + } + } + + /// Update the number of rebuilds in progress on this node. + fn update_num_rebuilds(&self) { + let mut num_rebuilds = 0; + self.nexus_states().iter().for_each(|nexus_state| { + num_rebuilds += nexus_state.nexus.rebuilds; + }); + let mut rebuilds = self.num_rebuilds.write(); + *rebuilds = num_rebuilds; + } + + /// Return the number of rebuilds in progress on this node. + pub(crate) fn num_rebuilds(&self) -> NumRebuilds { + *self.num_rebuilds.read() + } } /// CRUD Operations on a locked mayastor `NodeWrapper` such as: diff --git a/control-plane/agents/core/src/server.rs b/control-plane/agents/core/src/server.rs index 8030881d9..8361eed54 100644 --- a/control-plane/agents/core/src/server.rs +++ b/control-plane/agents/core/src/server.rs @@ -9,6 +9,7 @@ use crate::core::registry; use common::Service; use common_lib::types::v0::message_bus::ChannelVs; +use crate::registry::NumRebuilds; use common_lib::{mbus_api::BusClient, opentelemetry::default_tracing_tags}; use opentelemetry::{global, sdk::propagation::TraceContextPropagator, KeyValue}; use structopt::StructOpt; @@ -73,6 +74,11 @@ pub(crate) struct CliArgs { /// Trace rest requests to the Jaeger endpoint agent #[structopt(long, short)] jaeger: Option, + + /// The maximum number of system-wide rebuilds permitted at any given time. + /// If `None` do not limit the number of rebuilds. + #[structopt(long)] + max_rebuilds: Option, } impl CliArgs { fn args() -> Self { @@ -148,6 +154,7 @@ async fn server(cli_args: CliArgs) { cli_args.store_lease_ttl.into(), cli_args.reconcile_period.into(), cli_args.reconcile_idle_period.into(), + cli_args.max_rebuilds, ) .await; diff --git a/control-plane/agents/core/src/volume/specs.rs b/control-plane/agents/core/src/volume/specs.rs index aa4a3c4c2..9b7ad39af 100644 --- a/control-plane/agents/core/src/volume/specs.rs +++ b/control-plane/agents/core/src/volume/specs.rs @@ -1032,6 +1032,10 @@ impl ResourceSpecsLocked { replica: &Replica, mode: OperationMode, ) -> Result<(), SvcError> { + // Adding a replica to a nexus will initiate a rebuild. + // First check that we are able to start a rebuild. + registry.rebuild_allowed().await?; + let uri = self .make_replica_accessible(registry, replica, &nexus.node, mode) .await?; diff --git a/deployer/src/infra/mod.rs b/deployer/src/infra/mod.rs index 65b6c81c1..bc3e5a0c0 100644 --- a/deployer/src/infra/mod.rs +++ b/deployer/src/infra/mod.rs @@ -142,6 +142,9 @@ macro_rules! impl_ctrlp_agents { let jaeger_config = format!("jaeger.{}:6831", cfg.get_name()); binary = binary.with_args(vec!["--jaeger", &jaeger_config]); } + if let Some(max_rebuilds) = &options.max_rebuilds { + binary = binary.with_args(vec!["--max-rebuilds", &max_rebuilds.to_string()]); + } } if let Some(size) = &options.otel_max_batch_size { binary = binary.with_env("OTEL_BSP_MAX_EXPORT_BATCH_SIZE", size); diff --git a/deployer/src/lib.rs b/deployer/src/lib.rs index 9c33074ca..1e0754300 100644 --- a/deployer/src/lib.rs +++ b/deployer/src/lib.rs @@ -238,6 +238,10 @@ pub struct StartOptions { /// Add process service tags to the traces #[structopt(short, long, env = "TRACING_TAGS", value_delimiter=",", parse(try_from_str = common_lib::opentelemetry::parse_key_value))] tracing_tags: Vec, + + /// Maximum number of concurrent rebuilds across the cluster. + #[structopt(long)] + max_rebuilds: Option, } /// List of KeyValues @@ -364,6 +368,10 @@ impl StartOptions { }); self } + pub fn with_max_rebuilds(mut self, max: Option) -> Self { + self.max_rebuilds = max; + self + } } impl CliArgs { diff --git a/shell.nix b/shell.nix index 199004304..eb8c489c4 100644 --- a/shell.nix +++ b/shell.nix @@ -39,6 +39,7 @@ mkShell { tini nvme-cli fio + git ] ++ pkgs.lib.optional (!norust) channel.default.nightly; LIBCLANG_PATH = "${llvmPackages_11.libclang.lib}/lib"; diff --git a/tests/bdd/features/rebuild/feature.feature b/tests/bdd/features/rebuild/feature.feature new file mode 100644 index 000000000..72353ca80 --- /dev/null +++ b/tests/bdd/features/rebuild/feature.feature @@ -0,0 +1,12 @@ +Feature: Rebuilding a volume + + Scenario: exceeding the maximum number of rebuilds when increasing the replica count + Given a user defined maximum number of rebuilds + And an existing published volume + Then adding a replica should fail if doing so would exceed the maximum number of rebuilds + + Scenario: exceeding the maximum number of rebuilds when replacing a replica + Given a user defined maximum number of rebuilds + And an existing published volume + When a replica is faulted + Then replacing the replica should fail if doing so would exceed the maximum number of rebuilds \ No newline at end of file diff --git a/tests/bdd/test_ana_validate_nexus_swap.py b/tests/bdd/test_ana_validate_nexus_swap.py index 053da71de..7d7143eba 100644 --- a/tests/bdd/test_ana_validate_nexus_swap.py +++ b/tests/bdd/test_ana_validate_nexus_swap.py @@ -1,6 +1,5 @@ """Swap ANA enabled Nexus on ANA enabled host feature tests.""" import http -from time import sleep from pytest_bdd import ( given, diff --git a/tests/bdd/test_rebuild.py b/tests/bdd/test_rebuild.py new file mode 100644 index 000000000..77823b788 --- /dev/null +++ b/tests/bdd/test_rebuild.py @@ -0,0 +1,150 @@ +"""Rebuilding a volume feature tests.""" + +from pytest_bdd import ( + given, + scenario, + then, + when, +) + +import pytest +import http +import time +from retrying import retry + +from common.deployer import Deployer +from common.apiclient import ApiClient +from common.docker import Docker + +from openapi.model.create_pool_body import CreatePoolBody +from openapi.model.create_volume_body import CreateVolumeBody +from openapi.model.protocol import Protocol +from openapi.exceptions import ApiException +from openapi.model.volume_status import VolumeStatus +from openapi.model.volume_policy import VolumePolicy + +VOLUME_UUID = "5cd5378e-3f05-47f1-a830-a0f5873a1449" +VOLUME_SIZE = 10485761 +NUM_VOLUME_REPLICAS = 2 +NODE_1_NAME = "mayastor-1" +NODE_2_NAME = "mayastor-2" +NODE_3_NAME = "mayastor-3" +POOL_1_UUID = "4cc6ee64-7232-497d-a26f-38284a444980" +POOL_2_UUID = "91a60318-bcfe-4e36-92cb-ddc7abf212ea" +POOL_3_UUID = "4d471e62-ca17-44d1-a6d3-8820f6156c1a" +RECONCILE_PERIOD_SECS = 1 +MAX_REBUILDS = 0 # Prevent all rebuilds + + +@pytest.fixture(autouse=True) +def init(): + Deployer.start_with_args( + [ + "-m=3", + f"--max-rebuilds={MAX_REBUILDS}", + "-w=10s", + "--reconcile-idle-period=1s", + f"--reconcile-period={RECONCILE_PERIOD_SECS}s", + "--cache-period=1s", + ] + ) + + # Only create 2 pools so we can control where the intial replicas are placed. + ApiClient.pools_api().put_node_pool( + NODE_1_NAME, POOL_1_UUID, CreatePoolBody(["malloc:///disk?size_mb=50"]) + ) + ApiClient.pools_api().put_node_pool( + NODE_2_NAME, POOL_2_UUID, CreatePoolBody(["malloc:///disk?size_mb=50"]) + ) + + yield + Deployer.stop() + + +@scenario( + "features/rebuild/feature.feature", + "exceeding the maximum number of rebuilds when increasing the replica count", +) +def test_exceeding_the_maximum_number_of_rebuilds_when_increasing_the_replica_count(): + """exceeding the maximum number of rebuilds when increasing the replica count.""" + + +@scenario( + "features/rebuild/feature.feature", + "exceeding the maximum number of rebuilds when replacing a replica", +) +def test_exceeding_the_maximum_number_of_rebuilds_when_replacing_a_replica(): + """exceeding the maximum number of rebuilds when replacing a replica.""" + + +@given("a user defined maximum number of rebuilds") +def a_user_defined_maximum_number_of_rebuilds(): + """a user defined maximum number of rebuilds.""" + + +@given("an existing published volume") +def an_existing_published_volume(): + """an existing published volume.""" + request = CreateVolumeBody(VolumePolicy(True), NUM_VOLUME_REPLICAS, VOLUME_SIZE) + ApiClient.volumes_api().put_volume(VOLUME_UUID, request) + ApiClient.volumes_api().put_volume_target( + VOLUME_UUID, NODE_1_NAME, Protocol("nvmf") + ) + + # Now the volume has been created, create the additional pool. + ApiClient.pools_api().put_node_pool( + NODE_3_NAME, POOL_3_UUID, CreatePoolBody(["malloc:///disk?size_mb=50"]) + ) + + +@when("a replica is faulted") +def a_replica_is_faulted(): + """a replica is faulted.""" + # Fault a replica by stopping the container with the replica. + # Check the replica becomes unhealthy by waiting for the volume to become degraded. + Docker.stop_container(NODE_2_NAME) + wait_for_degraded_volume() + + +@then( + "adding a replica should fail if doing so would exceed the maximum number of rebuilds" +) +def adding_a_replica_should_fail_if_doing_so_would_exceed_the_maximum_number_of_rebuilds(): + """adding a replica should fail if doing so would exceed the maximum number of rebuilds.""" + pass + try: + ApiClient.volumes_api().put_volume_replica_count( + VOLUME_UUID, NUM_VOLUME_REPLICAS + 1 + ) + except ApiException as e: + assert e.status == http.HTTPStatus.INSUFFICIENT_STORAGE + + +@then( + "replacing the replica should fail if doing so would exceed the maximum number of rebuilds" +) +def replacing_the_replica_should_fail_if_doing_so_would_exceed_the_maximum_number_of_rebuilds(): + """replacing the replica should fail if doing so would exceed the maximum number of rebuilds.""" + wait_for_replica_removal() + # Check that a replica doesn't get added to the volume. + # This should be prevented because it would exceed the number of max rebuilds. + for _ in range(10): + check_replica_not_added() + time.sleep(RECONCILE_PERIOD_SECS) + + +@retry(wait_fixed=1000, stop_max_attempt_number=10) +def wait_for_degraded_volume(): + volume = ApiClient.volumes_api().get_volume(VOLUME_UUID) + assert volume.state.status == VolumeStatus("Degraded") + + +@retry(wait_fixed=1000, stop_max_attempt_number=10) +def wait_for_replica_removal(): + volume = ApiClient.volumes_api().get_volume(VOLUME_UUID) + assert len(volume.state.target["children"]) == NUM_VOLUME_REPLICAS - 1 + + +def check_replica_not_added(): + volume = ApiClient.volumes_api().get_volume(VOLUME_UUID) + assert len(volume.state.target["children"]) < NUM_VOLUME_REPLICAS