Skip to content

Commit

Permalink
chore(bors): merge pull request #511
Browse files Browse the repository at this point in the history
511: chore: wait for no shutdown nexuses r=chriswldenyer a=chriswldenyer

Before setting a node to state drained, verify that none of the volumes hosted on that node has shutdown nexuses.

Co-authored-by: chriswldenyer <[email protected]>
  • Loading branch information
mayastor-bors and chriswldenyer committed May 9, 2023
2 parents a4107c6 + 614e051 commit ecdfeec
Show file tree
Hide file tree
Showing 3 changed files with 262 additions and 22 deletions.
122 changes: 106 additions & 16 deletions control-plane/agents/src/bin/core/controller/reconciler/node/nexus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ use crate::controller::{
task_poller::{PollEvent, PollResult, PollTimer, PollerState},
};
use agents::errors::SvcError;
use std::{collections::HashSet, time::Duration};
use stor_port::types::v0::{
store::{node::NodeSpec, volume::VolumeSpec},
transport::{NodeId, RepublishVolume, VolumeId, VolumeShareProtocol},
};

const DRAINING_VOLUME_TIMEOUT_SECONDS: u64 = 120;

/// Node drain reconciler.
#[derive(Debug)]
pub(super) struct NodeNexusReconciler {
Expand Down Expand Up @@ -72,20 +75,77 @@ async fn republish_volume(
Ok(())
}

async fn find_shutdown_volumes(context: &PollContext, node_id: &NodeId) -> Result<(), SvcError> {
let draining_starttime = context.specs().node_draining_timestamp(node_id).await?;

let Some(draining_starttime) = draining_starttime else {
return Ok(());
};
let elapsed = draining_starttime.elapsed();
if elapsed.is_ok() && elapsed.unwrap() < Duration::from_secs(DRAINING_VOLUME_TIMEOUT_SECONDS) {
let draining_volumes = context.specs().node_draining_volumes(node_id).await?;
let mut draining_volumes_to_remove: HashSet<VolumeId> = HashSet::new();

for vi in draining_volumes {
let shutdown_nexuses = context
.registry()
.specs()
.volume_shutdown_nexuses(&vi)
.await;
if !shutdown_nexuses.is_empty() {
// if it still has shutdown nexuses
tracing::info!(
node.id = node_id.as_str(),
volume.uuid = vi.as_str(),
nexus.count = shutdown_nexuses.len(),
"Shutdown nexuses remain"
);
} else {
tracing::info!(
node.id = node_id.as_str(),
volume.uuid = vi.as_str(),
"Removing volume from the draining volume list"
);
draining_volumes_to_remove.insert(vi);
}
}
context
.specs()
.remove_node_draining_volumes(context.registry(), node_id, draining_volumes_to_remove)
.await?;
} else {
// else the drain operation is timed out
context
.specs()
.remove_all_node_draining_volumes(context.registry(), node_id)
.await?;
}
Ok(())
}

/// Drain the specified node if in draining state
async fn check_and_drain_node(context: &PollContext, node_spec: &NodeSpec) -> PollResult {
if !node_spec.is_draining() {
return PollResult::Ok(PollerState::Idle);
}

let node_id = node_spec.id();
tracing::trace!(node.id = node_spec.id().as_str(), "Draining node");

// In case this pod has restarted, set the timestamp of the draining node to now.
context
.specs()
.set_draining_timestamp_if_none(node_id)
.await?;

tracing::trace!(node.id = node_id.as_str(), "Draining node");
let vol_specs = context.specs().volumes_rsc();

let mut move_failures = false;

let mut new_draining_volumes: HashSet<VolumeId> = HashSet::new();

// Iterate through all the volumes, find those with a nexus hosted on the
// node and move each one away.
// node and move each one away via republish. Add each drained volume to the
// set of draining volumes stored in the node spec.
for vol_spec in vol_specs {
match vol_spec.operation_guard() {
Ok(mut guarded_vol_spec) => {
Expand Down Expand Up @@ -136,6 +196,7 @@ async fn check_and_drain_node(context: &PollContext, node_spec: &NodeSpec) -> Po
node.id = node_spec.id().as_str(),
"Moved volume"
);
new_draining_volumes.insert(vol_id.clone());
}
}
Err(_) => {
Expand All @@ -144,21 +205,50 @@ async fn check_and_drain_node(context: &PollContext, node_spec: &NodeSpec) -> Po
}
};
}
// Change the node state to "drained"
if let Err(error) = context
.specs()
.add_node_draining_volumes(context.registry(), node_spec.id(), new_draining_volumes)
.await
{
tracing::error!(
%error,
node.id = node_id.as_str(),
"Failed to add draining volumes"
);
return PollResult::Err(error);
}
if !move_failures {
if let Err(error) = context
.specs()
.set_node_drained(context.registry(), node_spec.id())
.await
{
tracing::error!(
%error,
node.id = node_id.as_str(),
"Failed to set node to state drained"
);
return PollResult::Err(error);
// All volumes on the node are republished.
// Determine whether we can mark the node as drained by checking
// that all drained volumes do not have shutdown nexuses.
// If that is not the case, the next reconciliation loop will check again.
find_shutdown_volumes(context, node_id).await?;

match context.specs().node_draining_volume_count(node_id).await? {
// if there are no more shutdown volumes, change the node state to "drained"
0 => {
if let Err(error) = context
.specs()
.set_node_drained(context.registry(), node_id)
.await
{
tracing::error!(
%error,
node.id = node_id.as_str(),
"Failed to set node to state drained"
);
return PollResult::Err(error);
}
tracing::info!(node.id = node_id.as_str(), "Set node to state drained");
}
remaining => {
tracing::info!(
node.id = node_id.as_str(),
nexus.count = remaining,
"Shutdown nexuses remain"
);
}
}
tracing::info!(node.id = node_id.as_str(), "Set node to state drained");
}
PollResult::Ok(PollerState::Idle)
}
94 changes: 93 additions & 1 deletion control-plane/agents/src/bin/core/node/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ use agents::errors::{NodeNotFound, SvcError};
use snafu::OptionExt;
use stor_port::types::v0::{
store::node::{NodeLabels, NodeSpec},
transport::{NodeId, Register},
transport::{NodeId, Register, VolumeId},
};

use crate::controller::{
registry::Registry,
resources::{operations_helper::ResourceSpecsLocked, ResourceMutex},
};

use std::{collections::HashSet, time::SystemTime};

impl ResourceSpecsLocked {
/// Create a node spec for the register request
pub(crate) async fn register_node(
Expand Down Expand Up @@ -183,4 +185,94 @@ impl ResourceSpecsLocked {
registry.store_obj(&drained_node_spec).await?;
Ok(drained_node_spec)
}

/// Add the draining volume to the node spec for checking shutdown nexuses.
pub(crate) async fn add_node_draining_volumes(
&self,
registry: &Registry,
node_id: &NodeId,
draining_volumes: HashSet<VolumeId>,
) -> Result<NodeSpec, SvcError> {
let node = self.node_rsc(node_id)?;
let drained_node_spec = {
let mut locked_node = node.lock();
locked_node.add_draining_volumes(draining_volumes);
locked_node.clone()
};
registry.store_obj(&drained_node_spec).await?;
Ok(drained_node_spec)
}

/// Get the draining volumes on this node.
pub(crate) async fn node_draining_volumes(
&self,
node_id: &NodeId,
) -> Result<HashSet<VolumeId>, SvcError> {
let node = self.node_rsc(node_id)?;
let locked_node = node.lock();
Ok(locked_node.draining_volumes())
}

/// Get the number of draining volumes on this node.
pub(crate) async fn node_draining_volume_count(
&self,
node_id: &NodeId,
) -> Result<usize, SvcError> {
let node = self.node_rsc(node_id)?;
let locked_node = node.lock();
Ok(locked_node.draining_volume_count())
}

/// Remove the given volume from this node.
pub(crate) async fn remove_node_draining_volumes(
&self,
registry: &Registry,
node_id: &NodeId,
draining_volumes: HashSet<VolumeId>,
) -> Result<NodeSpec, SvcError> {
let node = self.node_rsc(node_id)?;
let drained_node_spec = {
let mut locked_node = node.lock();
locked_node.remove_draining_volumes(draining_volumes);
locked_node.clone()
};
registry.store_obj(&drained_node_spec).await?;
Ok(drained_node_spec)
}
/// Remove all volumes from this node.
pub(crate) async fn remove_all_node_draining_volumes(
&self,
registry: &Registry,
node_id: &NodeId,
) -> Result<NodeSpec, SvcError> {
let node = self.node_rsc(node_id)?;
let drained_node_spec = {
let mut locked_node = node.lock();
locked_node.remove_all_draining_volumes();
locked_node.clone()
};
registry.store_obj(&drained_node_spec).await?;
Ok(drained_node_spec)
}

/// Set the draining timestamp on this node.
pub(crate) async fn set_draining_timestamp_if_none(
&self,
node_id: &NodeId,
) -> Result<(), SvcError> {
let node = self.node_rsc(node_id)?;
let mut locked_node = node.lock();
locked_node.set_draining_timestamp_if_none();
Ok(())
}

/// Get the draining timestamp on this node.
pub(crate) async fn node_draining_timestamp(
&self,
node_id: &NodeId,
) -> Result<Option<SystemTime>, SvcError> {
let node = self.node_rsc(node_id)?;
let locked_node = node.lock();
Ok(locked_node.draining_timestamp())
}
}
Loading

0 comments on commit ecdfeec

Please sign in to comment.