Skip to content

Commit

Permalink
feat(rebuild): system-wide maximum
Browse files Browse the repository at this point in the history
Introduce a system-wide maximum number of rebuilds. The maximum is
specified as an argument (max_rebuilds) to the core agent. If a maximum
is not specified, the number of rebuilds will not be limited. This
behaviour ensures backwards compatibility.
  • Loading branch information
Paul Yoong authored and tiagolobocastro committed Jun 7, 2022
1 parent 48fcd77 commit e60a999
Show file tree
Hide file tree
Showing 12 changed files with 279 additions and 8 deletions.
4 changes: 2 additions & 2 deletions composer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ impl RpcHandle {
}
}

let mayastor = MayastorClient::connect(format!("http://{}", endpoint.to_string()))
let mayastor = MayastorClient::connect(format!("http://{}", endpoint))
.await
.unwrap();
let bdev = BdevRpcClient::connect(format!("http://{}", endpoint.to_string()))
let bdev = BdevRpcClient::connect(format!("http://{}", endpoint))
.await
.unwrap();

Expand Down
11 changes: 11 additions & 0 deletions control-plane/agents/common/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ pub enum SvcError {
StoreMissingEntry { key: String },
#[snafu(display("The uuid '{}' for kind '{}' is not valid.", uuid, kind.to_string()))]
InvalidUuid { uuid: String, kind: ResourceKind },
#[snafu(display(
"Unable to start rebuild. Maximum number of rebuilds permitted is {}",
max_rebuilds
))]
MaxRebuilds { max_rebuilds: u32 },
}

impl From<StoreError> for SvcError {
Expand Down Expand Up @@ -534,6 +539,12 @@ impl From<SvcError> for ReplyError {
source: desc.to_string(),
extra: error.full_string(),
},
SvcError::MaxRebuilds { .. } => ReplyError {
kind: ReplyErrorKind::ResourceExhausted,
resource: ResourceKind::Volume,
source: desc.to_string(),
extra: error.full_string(),
},
}
}
}
Expand Down
28 changes: 28 additions & 0 deletions control-plane/agents/core/src/core/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ impl Deref for Registry {
}
}

/// Number of rebuilds
pub(crate) type NumRebuilds = u32;

/// Generic Registry Inner with a Store trait
#[derive(Debug)]
pub struct RegistryInner<S: Store> {
Expand All @@ -72,6 +75,8 @@ pub struct RegistryInner<S: Store> {
reconcile_period: std::time::Duration,
reconciler: ReconcilerControl,
config: CoreRegistryConfig,
/// system-wide maximum number of concurrent rebuilds allowed
max_rebuilds: Option<NumRebuilds>,
}

impl Registry {
Expand All @@ -85,6 +90,7 @@ impl Registry {
store_lease_tll: std::time::Duration,
reconcile_period: std::time::Duration,
reconcile_idle_period: std::time::Duration,
max_rebuilds: Option<NumRebuilds>,
) -> Self {
let store_endpoint = Self::format_store_endpoint(&store_url);
tracing::info!("Connecting to persistent store at {}", store_endpoint);
Expand All @@ -107,6 +113,7 @@ impl Registry {
reconcile_idle_period,
reconciler: ReconcilerControl::new(),
config: Self::get_config_or_panic(store).await,
max_rebuilds,
}),
};
registry.init().await;
Expand Down Expand Up @@ -277,4 +284,25 @@ impl Registry {
tokio::time::sleep(self.cache_period).await;
}
}

/// Determine if a rebuild is allowed to start.
/// Constrain the number of system-wide rebuilds to the maximum specified.
/// If a maximum is not specified, do not limit the number of rebuilds.
pub(crate) async fn rebuild_allowed(&self) -> Result<(), SvcError> {
match self.max_rebuilds {
Some(max_rebuilds) => {
let mut num_rebuilds = 0;
for (_id, node_wrapper) in self.nodes.read().await.iter() {
num_rebuilds += node_wrapper.read().await.num_rebuilds();
}

if num_rebuilds < max_rebuilds {
Ok(())
} else {
Err(SvcError::MaxRebuilds { max_rebuilds })
}
}
None => Ok(()),
}
}
}
58 changes: 53 additions & 5 deletions control-plane/agents/core/src/core/wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
states::{ResourceStates, ResourceStatesLocked},
},
node::service::NodeCommsTimeout,
NumRebuilds,
};

use common::{
Expand All @@ -26,7 +27,8 @@ use common_lib::{
};

use async_trait::async_trait;
use common_lib::types::v0::store::ResourceUuid;
use common_lib::types::v0::{message_bus, store::ResourceUuid};
use parking_lot::RwLock;
use rpc::mayastor::Null;
use snafu::ResultExt;
use std::{
Expand All @@ -39,6 +41,13 @@ type NodeResourceStates = (Vec<Replica>, Vec<PoolState>, Vec<Nexus>);
/// Default timeout for GET* gRPC requests (ex: GetPools, GetNexuses, etc..)
const GETS_TIMEOUT: MessageIdVs = MessageIdVs::Default;

enum ResourceType {
All(Vec<message_bus::PoolState>, Vec<Replica>, Vec<Nexus>),
Nexus(Vec<Nexus>),
Pool(Vec<message_bus::PoolState>),
Replica(Vec<Replica>),
}

/// Wrapper over a `Node` plus a few useful methods/properties. Includes:
/// all pools and replicas from the node
/// a watchdog to keep track of the node's liveness
Expand All @@ -60,6 +69,8 @@ pub(crate) struct NodeWrapper {
comms_timeouts: NodeCommsTimeout,
/// runtime state information
states: ResourceStatesLocked,
/// number of rebuilds in progress on the node
num_rebuilds: Arc<RwLock<NumRebuilds>>,
}

impl NodeWrapper {
Expand All @@ -77,6 +88,7 @@ impl NodeWrapper {
lock: Default::default(),
comms_timeouts,
states: ResourceStatesLocked::new(),
num_rebuilds: Arc::new(RwLock::new(0)),
}
}

Expand Down Expand Up @@ -402,7 +414,7 @@ impl NodeWrapper {

match fetch_result {
Ok((replicas, pools, nexuses)) => {
self.resources_mut().update(pools, replicas, nexuses);
self.update_resources(ResourceType::All(pools, replicas, nexuses));
if setting_online {
// we only set it as online after we've updated the resource states
// so an online node should be "up-to-date"
Expand Down Expand Up @@ -520,23 +532,59 @@ impl NodeWrapper {
/// Update all the nexus states.
async fn update_nexus_states(&self, client: &mut GrpcClient) -> Result<(), SvcError> {
let nexuses = self.fetch_nexuses(client).await?;
self.resources_mut().update_nexuses(nexuses);
self.update_resources(ResourceType::Nexus(nexuses));
Ok(())
}

/// Update all the pool states.
async fn update_pool_states(&self, client: &mut GrpcClient) -> Result<(), SvcError> {
let pools = self.fetch_pools(client).await?;
self.resources_mut().update_pools(pools);
self.update_resources(ResourceType::Pool(pools));
Ok(())
}

/// Update all the replica states.
async fn update_replica_states(&self, client: &mut GrpcClient) -> Result<(), SvcError> {
let replicas = self.fetch_replicas(client).await?;
self.resources_mut().update_replicas(replicas);
self.update_resources(ResourceType::Replica(replicas));
Ok(())
}

/// Update the states of the specified resource type.
/// Whenever the nexus states are updated the number of rebuilds must be updated.
fn update_resources(&self, resource_type: ResourceType) {
match resource_type {
ResourceType::All(pools, replicas, nexuses) => {
self.resources_mut().update(pools, replicas, nexuses);
self.update_num_rebuilds();
}
ResourceType::Nexus(nexuses) => {
self.resources_mut().update_nexuses(nexuses);
self.update_num_rebuilds();
}
ResourceType::Pool(pools) => {
self.resources_mut().update_pools(pools);
}
ResourceType::Replica(replicas) => {
self.resources_mut().update_replicas(replicas);
}
}
}

/// Update the number of rebuilds in progress on this node.
fn update_num_rebuilds(&self) {
let mut num_rebuilds = 0;
self.nexus_states().iter().for_each(|nexus_state| {
num_rebuilds += nexus_state.nexus.rebuilds;
});
let mut rebuilds = self.num_rebuilds.write();
*rebuilds = num_rebuilds;
}

/// Return the number of rebuilds in progress on this node.
pub(crate) fn num_rebuilds(&self) -> NumRebuilds {
*self.num_rebuilds.read()
}
}

/// CRUD Operations on a locked mayastor `NodeWrapper` such as:
Expand Down
7 changes: 7 additions & 0 deletions control-plane/agents/core/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::core::registry;
use common::Service;
use common_lib::types::v0::message_bus::ChannelVs;

use crate::registry::NumRebuilds;
use common_lib::{mbus_api::BusClient, opentelemetry::default_tracing_tags};
use opentelemetry::{global, sdk::propagation::TraceContextPropagator, KeyValue};
use structopt::StructOpt;
Expand Down Expand Up @@ -73,6 +74,11 @@ pub(crate) struct CliArgs {
/// Trace rest requests to the Jaeger endpoint agent
#[structopt(long, short)]
jaeger: Option<String>,

/// The maximum number of system-wide rebuilds permitted at any given time.
/// If `None` do not limit the number of rebuilds.
#[structopt(long)]
max_rebuilds: Option<NumRebuilds>,
}
impl CliArgs {
fn args() -> Self {
Expand Down Expand Up @@ -148,6 +154,7 @@ async fn server(cli_args: CliArgs) {
cli_args.store_lease_ttl.into(),
cli_args.reconcile_period.into(),
cli_args.reconcile_idle_period.into(),
cli_args.max_rebuilds,
)
.await;

Expand Down
4 changes: 4 additions & 0 deletions control-plane/agents/core/src/volume/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,10 @@ impl ResourceSpecsLocked {
replica: &Replica,
mode: OperationMode,
) -> Result<(), SvcError> {
// Adding a replica to a nexus will initiate a rebuild.
// First check that we are able to start a rebuild.
registry.rebuild_allowed().await?;

let uri = self
.make_replica_accessible(registry, replica, &nexus.node, mode)
.await?;
Expand Down
3 changes: 3 additions & 0 deletions deployer/src/infra/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ macro_rules! impl_ctrlp_agents {
let jaeger_config = format!("jaeger.{}:6831", cfg.get_name());
binary = binary.with_args(vec!["--jaeger", &jaeger_config]);
}
if let Some(max_rebuilds) = &options.max_rebuilds {
binary = binary.with_args(vec!["--max-rebuilds", &max_rebuilds.to_string()]);
}
}
if let Some(size) = &options.otel_max_batch_size {
binary = binary.with_env("OTEL_BSP_MAX_EXPORT_BATCH_SIZE", size);
Expand Down
8 changes: 8 additions & 0 deletions deployer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ pub struct StartOptions {
/// Add process service tags to the traces
#[structopt(short, long, env = "TRACING_TAGS", value_delimiter=",", parse(try_from_str = common_lib::opentelemetry::parse_key_value))]
tracing_tags: Vec<KeyValue>,

/// Maximum number of concurrent rebuilds across the cluster.
#[structopt(long)]
max_rebuilds: Option<u32>,
}

/// List of KeyValues
Expand Down Expand Up @@ -364,6 +368,10 @@ impl StartOptions {
});
self
}
pub fn with_max_rebuilds(mut self, max: Option<u32>) -> Self {
self.max_rebuilds = max;
self
}
}

impl CliArgs {
Expand Down
1 change: 1 addition & 0 deletions shell.nix
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ mkShell {
tini
nvme-cli
fio
git
] ++ pkgs.lib.optional (!norust) channel.default.nightly;

LIBCLANG_PATH = "${llvmPackages_11.libclang.lib}/lib";
Expand Down
12 changes: 12 additions & 0 deletions tests/bdd/features/rebuild/feature.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
Feature: Rebuilding a volume

Scenario: exceeding the maximum number of rebuilds when increasing the replica count
Given a user defined maximum number of rebuilds
And an existing published volume
Then adding a replica should fail if doing so would exceed the maximum number of rebuilds

Scenario: exceeding the maximum number of rebuilds when replacing a replica
Given a user defined maximum number of rebuilds
And an existing published volume
When a replica is faulted
Then replacing the replica should fail if doing so would exceed the maximum number of rebuilds
1 change: 0 additions & 1 deletion tests/bdd/test_ana_validate_nexus_swap.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Swap ANA enabled Nexus on ANA enabled host feature tests."""
import http
from time import sleep

from pytest_bdd import (
given,
Expand Down
Loading

0 comments on commit e60a999

Please sign in to comment.