Skip to content

Commit

Permalink
fix: don't add replica to nexus which can't rebuild properly
Browse files Browse the repository at this point in the history
Check if all nodes have the rebuild fix for the following operations:
- increase replica count on volume
- move replica
- add replica to nexus
- take snapshot on nr volume

Checking all nodes might lead to some false positives and also some issues if some nodes
have been removed from power but not from the pstor, but it's a very
simple way of ensuring all nodes are fixed.
A potential improvement may be to check if the replica nodes have the fix.

Signed-off-by: Tiago Castro <[email protected]>
  • Loading branch information
tiagolobocastro committed Jun 12, 2024
1 parent 366b497 commit 53500c0
Show file tree
Hide file tree
Showing 14 changed files with 209 additions and 10 deletions.
7 changes: 7 additions & 0 deletions control-plane/agents/src/bin/core/nexus/operations_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ impl OperationGuardArc<NexusSpec> {
&mut self,
registry: &Registry,
replica: &Replica,
snapshots_present: bool,
) -> Result<(), SvcError> {
// Adding a replica to a nexus will initiate a rebuild.
// First check that we are able to start a rebuild.
Expand All @@ -38,6 +39,7 @@ impl OperationGuardArc<NexusSpec> {
nexus: self.as_ref().uuid.clone(),
replica: ReplicaUri::new(&replica.uuid, &uri),
auto_rebuild: true,
snapshots_present,
};
self.add_replica(registry, &request).await?;
Ok(())
Expand Down Expand Up @@ -181,6 +183,11 @@ impl OperationGuardArc<NexusSpec> {
request: &AddNexusReplica,
) -> Result<Child, SvcError> {
let node = registry.node_wrapper(&request.node).await?;

if request.snapshots_present {
registry.verify_rebuild_ancestry_fix().await?;
}

let replica = registry.specs().replica(request.replica.uuid()).await?;
// we don't persist nexus owners to pstor anymore, instead we rebuild at startup
replica.lock().owners.add_owner(&request.nexus);
Expand Down
13 changes: 12 additions & 1 deletion control-plane/agents/src/bin/core/node/registry.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::controller::{registry::Registry, wrapper::NodeWrapper};
use agents::errors::SvcError;
use stor_port::types::v0::transport::{NodeId, NodeState, Register};
use stor_port::types::v0::transport::{NodeBugFix, NodeId, NodeState, Register};

use std::sync::Arc;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -68,4 +68,15 @@ impl Registry {
self.specs().register_node(self, request).await.ok();
}
}

/// Verify if all nodes have the rebuild ancestry fix and error out otherwise.
pub async fn verify_rebuild_ancestry_fix(&self) -> Result<(), SvcError> {
if !self
.specs()
.nodes_have_fix(NodeBugFix::NexusRebuildReplicaAncestry)
{
return Err(SvcError::UpgradeRequiredToRebuild {});
}
Ok(())
}
}
1 change: 0 additions & 1 deletion control-plane/agents/src/bin/core/node/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ impl ResourceSpecsLocked {
}

/// Check if all nodes have the fix.
#[allow(dead_code)]
pub(crate) fn nodes_have_fix(&self, fix: NodeBugFix) -> bool {
self.read().nodes.values().any(|n| n.lock().has_fix(&fix))
}
Expand Down
131 changes: 129 additions & 2 deletions control-plane/agents/src/bin/core/tests/volume/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use stor_port::{
openapi::models,
transport::{
CreateReplica, CreateVolume, DestroyPool, DestroyReplica, DestroyVolume, Filter,
PublishVolume, ReplicaId, SetVolumeReplica, SnapshotId, Volume, VolumeShareProtocol,
VolumeStatus,
NodeStatus, PublishVolume, ReplicaId, SetVolumeReplica, SnapshotId, Volume,
VolumeShareProtocol, VolumeStatus,
},
},
};
Expand Down Expand Up @@ -589,6 +589,133 @@ async fn unknown_snapshot_garbage_collector() {
assert_eq!(snaps.snapshots.len(), 3);
}

#[tokio::test]
async fn snapshot_upgrade() {
let mb = 1024 * 1024;
let gc_period = Duration::from_millis(200);
let cluster = ClusterBuilder::builder()
.with_rest(false)
.with_agents(vec!["core"])
.with_tmpfs_pool_ix(0, 100 * mb)
.with_tmpfs_pool_ix(1, 100 * mb)
.with_cache_period("200ms")
.with_reconcile_period(gc_period, gc_period)
.with_options(|b| {
b.with_io_engines(2)
.with_idle_io_engines(1)
.with_io_engine_tag("v2.6.1")
// testing: remove me
.with_idle_io_engine_bin("~/git/mayastor/io-engine/target/debug/io-engine-fix")
})
.build()
.await
.unwrap();

let vol_cli = cluster.grpc_client().volume();

let volume_1 = vol_cli
.create(
&CreateVolume {
uuid: "1e3cf927-80c2-47a8-adf0-95c486bdd7b7".try_into().unwrap(),
size: 16 * mb,
replicas: 1,
thin: false,
..Default::default()
},
None,
)
.await
.unwrap();

let volume_2 = vol_cli
.create(
&CreateVolume {
uuid: "1e3cf927-80c2-47a8-adf0-95c486bdd7b8".try_into().unwrap(),
size: 16 * mb,
replicas: 2,
thin: false,
..Default::default()
},
None,
)
.await
.unwrap();

let volume_1 = vol_cli
.publish(
&PublishVolume {
uuid: volume_1.uuid().clone(),
share: Some(VolumeShareProtocol::Nvmf),
target_node: Some(cluster.node(0)),
..Default::default()
},
None,
)
.await
.unwrap();

let _snapshot = vol_cli
.create_snapshot(
&CreateVolumeSnapshot::new(volume_1.uuid(), SnapshotId::new()),
None,
)
.await
.unwrap();
let error = vol_cli
.create_snapshot(
&CreateVolumeSnapshot::new(volume_2.uuid(), SnapshotId::new()),
None,
)
.await
.expect_err("Can't take nr snapshot");
assert_eq!(error.kind, ReplyErrorKind::FailedPrecondition);

let error = vol_cli
.set_replica(
&SetVolumeReplica {
uuid: volume_1.uuid().clone(),
replicas: 2,
},
None,
)
.await
.expect_err("No rebuild on older version!");
assert_eq!(error.kind, ReplyErrorKind::FailedPrecondition);

cluster.composer().stop(&cluster.node(0)).await.unwrap();
cluster
.wait_node_status(cluster.node(0), NodeStatus::Unknown)
.await
.unwrap();
cluster.composer().start(&cluster.node(2)).await.unwrap();
cluster
.wait_node_status(cluster.node(0), NodeStatus::Online)
.await
.unwrap();
cluster.wait_pool_online(cluster.pool(0, 0)).await.unwrap();

tokio::time::sleep(gc_period * 5).await;

let _volume = vol_cli
.set_replica(
&SetVolumeReplica {
uuid: volume_1.uuid().clone(),
replicas: 2,
},
None,
)
.await
.expect("After upgrade this should work!");

vol_cli
.create_snapshot(
&CreateVolumeSnapshot::new(volume_2.uuid(), SnapshotId::new()),
None,
)
.await
.expect("Now we can take nr snapshot");
}

#[tokio::test]
#[ignore]
async fn nr_snapshot() {
Expand Down
4 changes: 3 additions & 1 deletion control-plane/agents/src/bin/core/volume/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,9 @@ impl ResourceReplicas for OperationGuardArc<VolumeSpec> {
.and_then(|t| registry.specs().nexus_rsc(t.nexus()))
{
let mut guard = nexus_spec.operation_guard()?;
guard.attach_replica(registry, &new_replica).await?;
guard
.attach_replica(registry, &new_replica, self.has_snapshots())
.await?;

if request.delete {
self.remove_child_replica(request.replica(), &mut guard, registry)
Expand Down
16 changes: 14 additions & 2 deletions control-plane/agents/src/bin/core/volume/operations_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,9 @@ impl OperationGuardArc<VolumeSpec> {
.min(self.as_ref().num_replicas as usize);
let mut nexus_children = nexus.as_ref().children.len();

if self.has_snapshots() {
registry.verify_rebuild_ancestry_fix().await?;
}
let replicas = nexus_attach_candidates(self.as_ref(), nexus.as_ref(), registry).await?;

let mut result = Ok(());
Expand Down Expand Up @@ -374,7 +377,10 @@ impl OperationGuardArc<VolumeSpec> {
continue;
}

match nexus.attach_replica(registry, replica.state()).await {
match nexus
.attach_replica(registry, replica.state(), self.has_snapshots())
.await
{
Ok(_) => {
nexus.info_span(|| {
let state = replica.state();
Expand Down Expand Up @@ -588,7 +594,9 @@ impl OperationGuardArc<VolumeSpec> {
) -> Result<(), SvcError> {
if let Some(target) = &self.as_ref().target() {
let mut nexus_guard = registry.specs().nexus(target.nexus()).await?;
nexus_guard.attach_replica(registry, &replica).await
nexus_guard
.attach_replica(registry, &replica, self.has_snapshots())
.await
} else {
Ok(())
}
Expand Down Expand Up @@ -798,4 +806,8 @@ impl OperationGuardArc<VolumeSpec> {
Ok(false)
}
}

pub fn has_snapshots(&self) -> bool {
self.as_ref().metadata.has_snapshots()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ impl ResourceSnapshotting for OperationGuardArc<VolumeSpec> {
) -> Result<Self::CreateOutput, SvcError> {
let state = registry.volume_state(request.source_id()).await?;

if self.as_ref().num_replicas > 1 {
registry.verify_rebuild_ancestry_fix().await?;
}

let operation = VolumeOperation::CreateSnapshot(request.uuid().clone());
let spec_clone = self.start_update(registry, &state, operation).await?;

Expand Down
2 changes: 2 additions & 0 deletions control-plane/agents/src/bin/core/volume/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,8 @@ impl SpecOperationsHelper for VolumeSpec {
resource: ResourceKind::AffinityGroup,
count: *replica_count,
})
} else if *replica_count > self.num_replicas && self.has_snapshots() {
registry.verify_rebuild_ancestry_fix().await
} else {
Ok(())
}
Expand Down
9 changes: 9 additions & 0 deletions control-plane/agents/src/common/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ pub enum SvcError {
Internal { details: String },
#[snafu(display("Invalid Arguments"))]
InvalidArguments {},
#[snafu(display("IoEngine upgrade is required to rebuild volume with snapshots"))]
UpgradeRequiredToRebuild {},
#[snafu(display("Invalid {}, labels: {} ", resource_kind, labels))]
InvalidLabel {
labels: String,
Expand Down Expand Up @@ -555,6 +557,13 @@ impl From<SvcError> for ReplyError {
extra,
},

SvcError::UpgradeRequiredToRebuild { .. } => ReplyError {
kind: ReplyErrorKind::FailedPrecondition,
resource: ResourceKind::Unknown,
source,
extra,
},

SvcError::NodeNotOnline { .. } => ReplyError {
kind: ReplyErrorKind::FailedPrecondition,
resource: ResourceKind::Node,
Expand Down
6 changes: 6 additions & 0 deletions control-plane/stor-port/src/types/v0/store/nexus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ impl NexusSpec {
_ => None,
})
}
/// Get an iterator that references all the replica ids in the nexus.
pub fn replica_ids(&self) -> impl Iterator<Item = &ReplicaId> {
self.children
.iter()
.flat_map(|c| c.as_replica_ref().as_ref().map(|c| c.uuid()))
}
}

impl From<&NexusSpec> for CreateNexus {
Expand Down
9 changes: 8 additions & 1 deletion control-plane/stor-port/src/types/v0/store/nexus_child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,20 @@ pub enum NexusChild {
}

impl NexusChild {
/// Return Self as ReplicaUri
/// Return Self as ReplicaUri.
pub fn as_replica(&self) -> Option<ReplicaUri> {
match &self {
NexusChild::Replica(replica) => Some(replica.clone()),
NexusChild::Uri(_) => None,
}
}
/// Return Self as ReplicaUri
pub fn as_replica_ref(&self) -> Option<&ReplicaUri> {
match &self {
NexusChild::Replica(replica) => Some(replica),
NexusChild::Uri(_) => None,
}
}
/// Get the child URI
pub fn uri(&self) -> ChildUri {
match &self {
Expand Down
4 changes: 4 additions & 0 deletions control-plane/stor-port/src/types/v0/store/volume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ impl VolumeMetadata {
pub fn num_snapshots(&self) -> usize {
self.runtime.snapshots.len()
}
/// Check if there's any snapshot.
pub fn has_snapshots(&self) -> bool {
self.runtime.has_snapshots()
}
}

/// Volume meta information.
Expand Down
11 changes: 10 additions & 1 deletion control-plane/stor-port/src/types/v0/transport/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -869,15 +869,24 @@ pub struct AddNexusReplica {
pub replica: ReplicaUri,
/// Auto start rebuilding.
pub auto_rebuild: bool,
/// Snapshots are present in the other replicas.
pub snapshots_present: bool,
}
impl AddNexusReplica {
/// Return new `Self` from it's properties.
pub fn new(node: &NodeId, nexus: &NexusId, replica: &ReplicaUri, auto_rebuild: bool) -> Self {
pub fn new(
node: &NodeId,
nexus: &NexusId,
replica: &ReplicaUri,
auto_rebuild: bool,
snapshots_present: bool,
) -> Self {
Self {
node: node.clone(),
nexus: nexus.clone(),
replica: replica.clone(),
auto_rebuild,
snapshots_present,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion utils/deployer-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ impl Cluster {
}
Err(())
}
/// Wait till the node is in the given status.
/// Wait till the pool is online.
pub async fn wait_pool_online(&self, pool_id: PoolId) -> Result<(), ()> {
let timeout = Duration::from_secs(2);
let start = std::time::Instant::now();
Expand Down

0 comments on commit 53500c0

Please sign in to comment.