Skip to content

Commit

Permalink
chore(bors): merge pull request #503
Browse files Browse the repository at this point in the history
503: Several Switchover fixes r=tiagolobocastro a=tiagolobocastro

    test(switchover/bdd): add robustness tests
    
    Add tests to help exercise some corner cases seen during bug study.
    Tbh these are not very precise as it's difficult to re-create some corner cases,
    but nonetheless they are probably a decent starting point and can be used to help
    manually try to recreate these issues in a more automated fashion.
    
    Signed-off-by: Tiago Castro <[email protected]>



Co-authored-by: Tiago Castro <[email protected]>
  • Loading branch information
mayastor-bors and tiagolobocastro committed May 2, 2023
2 parents db147ff + d583ae4 commit be52386
Show file tree
Hide file tree
Showing 27 changed files with 1,008 additions and 306 deletions.
18 changes: 13 additions & 5 deletions control-plane/agents/src/bin/ha/cluster/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ use tracing::{debug, error};

/// Represent object to access Etcd.
#[derive(Debug, Clone)]
pub struct EtcdStore {
pub(crate) struct EtcdStore {
store: Arc<Mutex<Etcd>>,
timeout: Duration,
}

impl EtcdStore {
/// Create a new Etcd client.
pub async fn new(endpoint: Uri, timeout: Duration) -> Result<Self, Error> {
pub(crate) async fn new(endpoint: Uri, timeout: Duration) -> Result<Self, Error> {
match tokio::time::timeout(timeout, async { Etcd::new(&endpoint.to_string()).await }).await
{
Ok(v) => {
Expand All @@ -38,7 +38,10 @@ impl EtcdStore {
}

/// Serialized write to the persistent store.
pub async fn store_obj<O: StorableObject>(&self, object: &O) -> Result<(), anyhow::Error> {
pub(crate) async fn store_obj<O: StorableObject>(
&self,
object: &O,
) -> Result<(), anyhow::Error> {
let mut store = self.store.lock().await;
match tokio::time::timeout(self.timeout, async move { store.put_obj(object).await }).await {
Ok(result) => result.map_err(Into::into),
Expand All @@ -54,7 +57,10 @@ impl EtcdStore {
}

/// Delete the object from the persistent store.
pub async fn delete_obj<O: StorableObject>(&self, object: &O) -> Result<(), anyhow::Error> {
pub(crate) async fn delete_obj<O: StorableObject>(
&self,
object: &O,
) -> Result<(), anyhow::Error> {
let mut store = self.store.lock().await;
match tokio::time::timeout(self.timeout, async move {
store.delete_kv(&object.key().key()).await
Expand All @@ -79,7 +85,9 @@ impl EtcdStore {

/// Get incomplete requests stored in Etcd.
/// Request with error or path published is considered a complete request.
pub async fn fetch_incomplete_requests(&self) -> Result<Vec<SwitchOverRequest>, anyhow::Error> {
pub(crate) async fn fetch_incomplete_requests(
&self,
) -> Result<Vec<SwitchOverRequest>, anyhow::Error> {
let mut store = self.store.lock().await;
let key = key_prefix_obj(StorableObjectType::SwitchOver, API_VERSION);
let store_entries = store.get_values_prefix(&key).await?;
Expand Down
78 changes: 63 additions & 15 deletions control-plane/agents/src/bin/ha/cluster/nodes.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,74 @@
use crate::volume::VolumeMover;
use crate::{
switchover::{Stage, SwitchOverRequest, SwitchOverStage},
volume::VolumeMover,
};
use stor_port::{
transport_api::{ReplyError, ResourceKind},
types::v0::transport::NodeId,
};

use std::{collections::HashMap, net::SocketAddr, sync::Arc};
use stor_port::types::v0::transport::NodeId;
use tokio::sync::Mutex;
use tracing::info;

/// A record about a failed path reported by the ha node agent.
#[derive(Debug)]
struct PathRecord {
_socket: SocketAddr,
stage: SwitchOverStage,
}
impl PathRecord {
fn stage(&self) -> Stage {
self.stage.read()
}
}

/// Store node information and reported failed path.
#[derive(Debug, Default, Clone)]
pub struct NodeList {
pub(crate) struct NodeList {
list: Arc<Mutex<HashMap<NodeId, SocketAddr>>>,
failed_path: Arc<Mutex<HashMap<String, SocketAddr>>>,
failed_path: Arc<Mutex<HashMap<String, PathRecord>>>,
}

impl NodeList {
/// Get a new `Self`.
pub fn new() -> Self {
pub(crate) fn new() -> Self {
Self::default()
}

/// Register node and its endpoint.
/// If the node is already registered then update its details.
pub async fn register_node(&self, name: NodeId, endpoint: SocketAddr) {
pub(crate) async fn register_node(&self, name: NodeId, endpoint: SocketAddr) {
let mut list = self.list.lock().await;
list.insert(name, endpoint);
}

/// Remove path from failed_path list.
pub async fn remove_failed_path(&self, path: String) {
pub(crate) async fn remove_failed_path(&self, path: &str) {
let mut failed_path = self.failed_path.lock().await;
failed_path.remove(&path);
failed_path.remove(path);
}
/// Add a failed switchover request to the failed paths.
/// Useful when reloading from the pstor after a crash/restart.
pub(crate) async fn insert_failed_request(&self, request: &SwitchOverRequest) {
let record = PathRecord {
_socket: request.socket(),
stage: request.stage_arc(),
};
self.failed_path
.lock()
.await
.insert(request.nqn().into(), record);
}

/// Send request to the switchover engine for the reported node and path.
pub async fn report_failed_path(
pub(crate) async fn report_failed_path(
self,
node: NodeId,
path: String,
mover: VolumeMover,
endpoint: SocketAddr,
) -> Result<(), anyhow::Error> {
) -> Result<(), ReplyError> {
// Check if node is registered in the hashmap. Register if not.
self.list
.lock()
Expand All @@ -47,14 +78,31 @@ impl NodeList {

let mut failed_path = self.failed_path.lock().await;

if failed_path.get(&path).is_some() {
anyhow::bail!("Path {} is already reported for switchover", path);
};
if let Some(record) = failed_path.get(&path) {
return match record.stage() {
Stage::ReplacePath | Stage::DeleteTarget | Stage::Successful | Stage::Errored => {
Err(ReplyError::failed_precondition(
ResourceKind::NvmePath,
path,
"Path is already reported for switchover".to_owned(),
))
}
Stage::Init | Stage::RepublishVolume => Err(ReplyError::already_exist(
ResourceKind::NvmePath,
path,
"Path is already reported for switchover".to_owned(),
)),
};
}

info!(node.id=%node, %path, "Sending switchover");

mover.switchover(node, endpoint, path.clone()).await?;
failed_path.insert(path, endpoint);
let stage = mover.switchover(node, endpoint, path.clone()).await?;
let record = PathRecord {
_socket: endpoint,
stage,
};
failed_path.insert(path, record);
Ok(())
}
}
38 changes: 19 additions & 19 deletions control-plane/agents/src/bin/ha/cluster/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use grpc::{
},
};
use std::{net::SocketAddr, sync::Arc};
use stor_port::transport_api::{ReplyError, ReplyErrorKind, ResourceKind};
use stor_port::{
transport_api::{ReplyError, ResourceKind},
types::v0::transport::FailedPathsResponse,
};

/// High-level object that represents HA Cluster agent gRPC server.
pub(crate) struct ClusterAgent {
Expand Down Expand Up @@ -69,37 +72,34 @@ impl ClusterAgentOperations for ClusterAgentSvc {
&self,
request: &dyn ReportFailedPathsInfo,
_context: Option<Context>,
) -> Result<(), ReplyError> {
let mut v: Vec<(String, String)> = Vec::new();
) -> Result<FailedPathsResponse, ReplyError> {
let mut report = FailedPathsResponse::default();

for x in request.failed_paths().into_iter() {
for path in request.failed_paths().into_iter() {
let nodes = self.nodes.clone();
match nodes
.report_failed_path(
request.node().into(),
x.target_nqn().to_string(),
path.target_nqn().to_string(),
self.mover.clone(),
request.endpoint(),
)
.await
{
Ok(_) => continue,
Err(err) => v.push((x.target_nqn().to_string(), err.to_string())),
Ok(_) => {
report.push(tonic::Code::Ok, path.target_nqn());
}
Err(error) => {
let status = tonic::Status::from(error);
report.push(status.code(), path.target_nqn());
}
}
}

if !v.is_empty() {
let mut e = ReplyError {
kind: ReplyErrorKind::WithMessage,
resource: ResourceKind::Unknown,
source: "".into(),
extra: "".into(),
};
v.iter().for_each(|x| {
e.extend(&x.0, &x.1);
});
return Err(e);
if report.is_all_ok() {
return Ok(FailedPathsResponse::default());
}
Ok(())

Ok(report)
}
}
Loading

0 comments on commit be52386

Please sign in to comment.