diff --git a/.github/workflows/release_artifacts.yml b/.github/workflows/release_artifacts.yml index c09159b22..c7abd1887 100644 --- a/.github/workflows/release_artifacts.yml +++ b/.github/workflows/release_artifacts.yml @@ -4,6 +4,7 @@ on: branches: - master - 'release/**' + - 'hotfix-v**' jobs: kubectl-plugin: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 41de7cf45..a44ba0040 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -53,10 +53,3 @@ repos: args: [ "--changes" ] pass_filenames: false language: system - - id: helm-develop-deploy - name: Helm Generator - description: Ensures the deploy is updated with the develop yamls - entry: ./scripts/generate-deploy-yamls.sh - args: [ "-c", "develop" ] - pass_filenames: false - language: system diff --git a/chart/templates/csi-deployment.yaml b/chart/templates/csi-deployment.yaml index ac42b5b3c..a2f797ecc 100644 --- a/chart/templates/csi-deployment.yaml +++ b/chart/templates/csi-deployment.yaml @@ -57,7 +57,7 @@ spec: imagePullPolicy: {{ .Values.mayastorCP.pullPolicy }} args: - "--csi-socket=/var/lib/csi/sockets/pluginproxy/csi.sock" - - "--rest-endpoint=http://$(REST_SERVICE_HOST):8081"{{ if .Values.base.jaeger.enabled }} + - "--rest-endpoint=http://rest:8081"{{ if .Values.base.jaeger.enabled }} - "--jaeger={{ .Values.base.jaeger.agent.name }}:{{ .Values.base.jaeger.agent.port }}"{{ end }} env: - name: RUST_LOG diff --git a/chart/templates/msp-deployment.yaml b/chart/templates/msp-deployment.yaml index 1483d9e9b..ced77d79d 100644 --- a/chart/templates/msp-deployment.yaml +++ b/chart/templates/msp-deployment.yaml @@ -26,7 +26,7 @@ spec: image: {{ .Values.mayastorCP.registry }}mayadata/mcp-msp-operator:{{ .Values.mayastorCP.tag }} imagePullPolicy: {{ .Values.mayastorCP.pullPolicy }} args: - - "-e http://$(REST_SERVICE_HOST):8081" + - "-e http://rest:8081" - "--interval={{ .Values.base.cache_poll_period }}"{{ if .Values.base.jaeger.enabled }} - "--jaeger={{ .Values.base.jaeger.agent.name }}:{{ .Values.base.jaeger.agent.port }}"{{ end }} env: diff --git a/common/src/opentelemetry.rs b/common/src/opentelemetry.rs index 7e1e73769..b16eee288 100644 --- a/common/src/opentelemetry.rs +++ b/common/src/opentelemetry.rs @@ -19,3 +19,18 @@ pub fn default_tracing_tags(git_commit: &str, cargo_version: &str) -> Vec for EtcdSingletonLock { #[async_trait::async_trait] impl LeaseLockKeeperClocking for EtcdSingletonLock { - #[tracing::instrument(skip(self, state), err)] + #[tracing::instrument(level = "trace", skip(self, state), err)] async fn clock(&mut self, mut state: KeepAlive) -> LockStatesResult { state .keeper diff --git a/common/src/types/v0/message_bus/replica.rs b/common/src/types/v0/message_bus/replica.rs index 3fc58c0bc..b9f952419 100644 --- a/common/src/types/v0/message_bus/replica.rs +++ b/common/src/types/v0/message_bus/replica.rs @@ -158,6 +158,7 @@ pub struct CreateReplica { #[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)] pub struct ReplicaOwners { volume: Option, + #[serde(skip)] nexuses: Vec, } impl ReplicaOwners { diff --git a/common/src/types/v0/store/nexus_persistence.rs b/common/src/types/v0/store/nexus_persistence.rs index 209a6e01e..d1f5007c4 100644 --- a/common/src/types/v0/store/nexus_persistence.rs +++ b/common/src/types/v0/store/nexus_persistence.rs @@ -26,6 +26,10 @@ impl NexusInfo { None => false, } } + /// Check if no replica is healthy + pub fn no_healthy_replicas(&self) -> bool { + self.children.iter().all(|c| !c.healthy) || self.children.is_empty() + } } /// Definition of the child information that gets saved in the persistent diff --git a/control-plane/agents/common/src/errors.rs b/control-plane/agents/common/src/errors.rs index ae7574437..138d18b6e 100644 --- a/control-plane/agents/common/src/errors.rs +++ b/control-plane/agents/common/src/errors.rs @@ -186,6 +186,8 @@ pub enum SvcError { ReplicaCreateNumber { id: String }, #[snafu(display("No online replicas are available for Volume '{}'", id))] NoOnlineReplicas { id: String }, + #[snafu(display("No healthy replicas are available for Volume '{}'", id))] + NoHealthyReplicas { id: String }, #[snafu(display("Entry with key '{}' not found in the persistent store.", key))] StoreMissingEntry { key: String }, #[snafu(display("The uuid '{}' for kind '{}' is not valid.", uuid, kind.to_string()))] @@ -514,6 +516,12 @@ impl From for ReplyError { source: desc.to_string(), extra: error.full_string(), }, + SvcError::NoHealthyReplicas { .. } => ReplyError { + kind: ReplyErrorKind::VolumeNoReplicas, + resource: ResourceKind::Volume, + source: desc.to_string(), + extra: error.full_string(), + }, SvcError::ReplicaCreateNumber { .. } => ReplyError { kind: ReplyErrorKind::ReplicaCreateNumber, resource: ResourceKind::Volume, diff --git a/control-plane/agents/core/src/core/grpc.rs b/control-plane/agents/core/src/core/grpc.rs index b510873e3..da4160428 100644 --- a/control-plane/agents/core/src/core/grpc.rs +++ b/control-plane/agents/core/src/core/grpc.rs @@ -56,13 +56,27 @@ impl GrpcContext { comms_timeouts: comms_timeouts.clone(), }) } - pub(crate) async fn lock(&self) -> tokio::sync::OwnedMutexGuard<()> { + /// Override the timeout config in the context for the given request + fn override_timeout(&mut self, request: Option) { + let timeout = request + .map(|r| r.timeout(self.comms_timeouts.request(), &bus())) + .unwrap_or_else(|| self.comms_timeouts.request()); + + self.endpoint = self + .endpoint + .clone() + .connect_timeout(self.comms_timeouts.connect() + Duration::from_millis(500)) + .timeout(timeout); + } + pub(crate) async fn lock(&self) -> GrpcLockGuard { self.lock.clone().lock_owned().await } pub(crate) async fn connect(&self) -> Result { GrpcClient::new(self).await } - pub(crate) async fn connect_locked(&self) -> Result { + pub(crate) async fn connect_locked( + &self, + ) -> Result { GrpcClientLocked::new(self).await } } @@ -72,7 +86,7 @@ impl GrpcContext { pub(crate) struct GrpcClient { context: GrpcContext, /// gRPC Mayastor Client - pub(crate) client: MayaClient, + pub(crate) mayastor: MayaClient, } pub(crate) type MayaClient = MayastorClient; impl GrpcClient { @@ -96,23 +110,48 @@ impl GrpcClient { Ok(Self { context: context.clone(), - client, + mayastor: client, }) } } -/// Wrapper over all gRPC Clients types with implicit locking for serialization +/// Async Lock guard for gRPC operations. +/// It's used by the GrpcClientLocked to ensure there's only one operation in progress +/// at a time while still allowing for multiple gRPC clients. +type GrpcLockGuard = tokio::sync::OwnedMutexGuard<()>; + +/// Wrapper over all gRPC Clients types with implicit locking for serialization. pub(crate) struct GrpcClientLocked { /// gRPC auto CRUD guard lock - _lock: tokio::sync::OwnedMutexGuard<()>, + _lock: GrpcLockGuard, client: GrpcClient, } impl GrpcClientLocked { - pub(crate) async fn new(context: &GrpcContext) -> Result { - let client = GrpcClient::new(context).await?; + /// Create new locked client from the given context + /// A connection is established with the timeouts specified from the context. + /// Only one `Self` is allowed at a time by making use of a lock guard. + pub(crate) async fn new(context: &GrpcContext) -> Result { + let _lock = context.lock().await; + + let client = match GrpcClient::new(context).await { + Ok(client) => client, + Err(error) => return Err((_lock, error)), + }; + + Ok(Self { _lock, client }) + } + /// Reconnect the client to use for the given request + /// This is useful when we want to issue the next gRPC using a different timeout + /// todo: tower should allow us to handle this better by keeping the same "backend" client + /// but modifying the timeout layer? + pub(crate) async fn reconnect(self, request: R) -> Result { + let mut context = self.context.clone(); + context.override_timeout(Some(request)); + + let client = GrpcClient::new(&context).await?; Ok(Self { - _lock: context.lock().await, + _lock: self._lock, client, }) } diff --git a/control-plane/agents/core/src/core/reconciler/mod.rs b/control-plane/agents/core/src/core/reconciler/mod.rs index a51feffb6..c9951375d 100644 --- a/control-plane/agents/core/src/core/reconciler/mod.rs +++ b/control-plane/agents/core/src/core/reconciler/mod.rs @@ -48,7 +48,10 @@ impl ReconcilerControl { } /// Send an event signal to the poller's main loop + /// todo: don't requeque duplicate events pub(crate) async fn notify(&self, event: PollEvent) { - self.event_channel.send(event).await.ok(); + if let Err(error) = self.event_channel.try_send(event) { + tracing::warn!(error=?error, "Failed to send event to reconcile worker"); + } } } diff --git a/control-plane/agents/core/src/core/reconciler/nexus/garbage_collector.rs b/control-plane/agents/core/src/core/reconciler/nexus/garbage_collector.rs index c3302c479..a34ca10b1 100644 --- a/control-plane/agents/core/src/core/reconciler/nexus/garbage_collector.rs +++ b/control-plane/agents/core/src/core/reconciler/nexus/garbage_collector.rs @@ -11,6 +11,7 @@ use common_lib::types::v0::{ use crate::core::task_poller::PollTriggerEvent; use parking_lot::Mutex; use std::sync::Arc; +use tracing::Instrument; /// Nexus Garbage Collector reconciler #[derive(Debug)] @@ -110,13 +111,25 @@ async fn destroy_disowned_nexus( if nexus_clone.managed && !nexus_clone.owned() { let node_online = matches!(context.registry().get_node_wrapper(&nexus_clone.node).await, Ok(node) if node.read().await.is_online()); if node_online { - nexus_clone.warn_span(|| tracing::warn!("Attempting to destroy disowned nexus")); - let request = DestroyNexus::from(nexus_clone.clone()); - context - .specs() - .destroy_nexus(context.registry(), &request, true, mode) - .await?; - nexus_clone.info_span(|| tracing::info!("Successfully destroyed disowned nexus")); + async { + nexus_clone.warn_span(|| tracing::warn!("Attempting to destroy disowned nexus")); + let request = DestroyNexus::from(nexus_clone.clone()); + match context + .specs() + .destroy_nexus(context.registry(), &request, true, mode) + .await { + Ok(_) => { + nexus_clone.info_span(|| tracing::info!("Successfully destroyed disowned nexus")); + Ok(()) + } + Err(error) => { + nexus_clone.error_span(|| tracing::error!(error = %error, "Failed to destroy disowned nexus")); + Err(error) + } + } + } + .instrument(tracing::info_span!("destroy_disowned_nexus", nexus.uuid = %nexus_clone.uuid, request.reconcile = true)) + .await?; } } diff --git a/control-plane/agents/core/src/core/reconciler/nexus/mod.rs b/control-plane/agents/core/src/core/reconciler/nexus/mod.rs index a32f109b2..6f82b88e8 100644 --- a/control-plane/agents/core/src/core/reconciler/nexus/mod.rs +++ b/control-plane/agents/core/src/core/reconciler/nexus/mod.rs @@ -25,8 +25,12 @@ use common_lib::{ }; use garbage_collector::GarbageCollector; +use crate::core::wrapper::NodeWrapper; +use common_lib::types::v0::message_bus::NexusStatus; use parking_lot::Mutex; use std::{convert::TryFrom, sync::Arc}; +use tokio::sync::RwLock; +use tracing::Instrument; /// Nexus Reconciler loop #[derive(Debug)] @@ -98,7 +102,7 @@ async fn nexus_reconciler( /// Find and removes faulted children from the given nexus /// If the child is a replica it also disowns and destroys it -#[tracing::instrument(skip(nexus_spec, context, mode), fields(nexus.uuid = %nexus_spec.lock().uuid))] +#[tracing::instrument(skip(nexus_spec, context, mode), level = "trace", fields(nexus.uuid = %nexus_spec.lock().uuid, request.reconcile = true))] pub(super) async fn faulted_children_remover( nexus_spec: &Arc>, context: &PollContext, @@ -107,25 +111,38 @@ pub(super) async fn faulted_children_remover( let nexus_spec_clone = nexus_spec.lock().clone(); let nexus_uuid = nexus_spec_clone.uuid.clone(); let nexus_state = context.registry().get_nexus(&nexus_uuid).await?; - for child in nexus_state.children.iter().filter(|c| c.state.faulted()) { - nexus_spec_clone - .warn_span(|| tracing::warn!("Attempting to remove faulted child '{}'", child.uri)); - if let Err(error) = context - .specs() - .remove_nexus_child_by_uri(context.registry(), &nexus_state, &child.uri, true, mode) - .await - { - nexus_spec_clone.error(&format!( - "Failed to remove faulted child '{}', error: '{}'", - child.uri, - error.full_string(), - )); - } else { - nexus_spec_clone.info(&format!( - "Successfully removed faulted child '{}'", - child.uri, - )); + let child_count = nexus_state.children.len(); + + // Remove faulted children only from a degraded nexus with other healthy children left + if nexus_state.status == NexusStatus::Degraded && child_count > 1 { + async { + for child in nexus_state.children.iter().filter(|c| c.state.faulted()) { + nexus_spec_clone + .warn_span(|| tracing::warn!("Attempting to remove faulted child '{}'", child.uri)); + if let Err(error) = context + .specs() + .remove_nexus_child_by_uri(context.registry(), &nexus_state, &child.uri, true, mode) + .await + { + nexus_spec_clone.error_span(|| { + tracing::error!( + error = %error.full_string().as_str(), + child.uri = %child.uri.as_str(), + "Failed to remove faulted child" + ) + }); + } else { + nexus_spec_clone.info_span(|| { + tracing::info!( + child.uri = %child.uri.as_str(), + "Successfully removed faulted child", + ) + }); + } + } } + .instrument(tracing::info_span!("faulted_children_remover", nexus.uuid = %nexus_uuid, request.reconcile = true)) + .await } PollResult::Ok(PollerState::Idle) @@ -133,7 +150,7 @@ pub(super) async fn faulted_children_remover( /// Find and removes unknown children from the given nexus /// If the child is a replica it also disowns and destroys it -#[tracing::instrument(skip(nexus_spec, context, mode), fields(nexus.uuid = %nexus_spec.lock().uuid))] +#[tracing::instrument(skip(nexus_spec, context, mode), level = "trace", fields(nexus.uuid = %nexus_spec.lock().uuid, request.reconcile = true))] pub(super) async fn unknown_children_remover( nexus_spec: &Arc>, context: &PollContext, @@ -145,25 +162,42 @@ pub(super) async fn unknown_children_remover( let state_children = nexus_state.children.iter(); let spec_children = nexus_spec.lock().children.clone(); - for child in state_children.filter(|c| !spec_children.iter().any(|spec| spec.uri() == c.uri)) { - nexus_spec_clone - .warn_span(|| tracing::warn!("Attempting to remove unknown child '{}'", child.uri)); - if let Err(error) = context - .specs() - .remove_nexus_child_by_uri(context.registry(), &nexus_state, &child.uri, false, mode) - .await - { - nexus_spec_clone.error(&format!( - "Failed to remove unknown child '{}', error: '{}'", - child.uri, - error.full_string(), - )); - } else { - nexus_spec_clone.info(&format!( - "Successfully removed unknown child '{}'", - child.uri, - )); + let unknown_children = state_children + .filter(|c| !spec_children.iter().any(|spec| spec.uri() == c.uri)) + .cloned() + .collect::>(); + + if !unknown_children.is_empty() { + async move { + for child in unknown_children { + nexus_spec_clone + .warn_span(|| tracing::warn!("Attempting to remove unknown child '{}'", child.uri)); + if let Err(error) = context + .specs() + .remove_nexus_child_by_uri( + context.registry(), + &nexus_state, + &child.uri, + false, + mode, + ) + .await + { + nexus_spec_clone.error(&format!( + "Failed to remove unknown child '{}', error: '{}'", + child.uri, + error.full_string(), + )); + } else { + nexus_spec_clone.info(&format!( + "Successfully removed unknown child '{}'", + child.uri, + )); + } + } } + .instrument(tracing::info_span!("unknown_children_remover", nexus.uuid = %nexus_uuid, request.reconcile = true)) + .await } PollResult::Ok(PollerState::Idle) @@ -172,7 +206,7 @@ pub(super) async fn unknown_children_remover( /// Find missing children from the given nexus /// They are removed from the spec as we don't know why they got removed, so it's safer /// to just disown and destroy them. -#[tracing::instrument(skip(nexus_spec, context, mode), fields(nexus.uuid = %nexus_spec.lock().uuid))] +#[tracing::instrument(skip(nexus_spec, context, mode), level = "trace", fields(nexus.uuid = %nexus_spec.lock().uuid, request.reconcile = true))] pub(super) async fn missing_children_remover( nexus_spec: &Arc>, context: &PollContext, @@ -220,94 +254,113 @@ pub(super) async fn missing_children_remover( /// Recreate the given nexus on its associated node /// Only healthy and online replicas are reused in the nexus recreate request -#[tracing::instrument(skip(nexus_spec, context), fields(nexus.uuid = %nexus_spec.lock().uuid))] pub(super) async fn missing_nexus_recreate( nexus_spec: &Arc>, context: &PollContext, mode: OperationMode, ) -> PollResult { - let mut nexus = nexus_spec.lock().clone(); + let nexus = nexus_spec.lock().clone(); let nexus_uuid = nexus.uuid.clone(); if context.registry().get_nexus(&nexus_uuid).await.is_ok() { return PollResult::Ok(PollerState::Idle); } - let warn_missing = |nexus_spec: &NexusSpec, node_status: NodeStatus| { - nexus_spec.debug_span(|| { - tracing::debug!( - node.uuid = %nexus_spec.node, - node.status = %node_status.to_string(), - "Attempted to recreate missing nexus, but the node is not online" - ) - }); - }; - - let node = match context.registry().get_node_wrapper(&nexus.node).await { - Ok(node) if !node.read().await.is_online() => { - let node_status = node.read().await.status().clone(); - warn_missing(&nexus, node_status); - return PollResult::Ok(PollerState::Idle); - } - Err(_) => { - warn_missing(&nexus, NodeStatus::Unknown); - return PollResult::Ok(PollerState::Idle); - } - Ok(node) => node, - }; + #[tracing::instrument(skip(nexus, context, mode), fields(nexus.uuid = %nexus.uuid, request.reconcile = true))] + async fn missing_nexus_recreate( + mut nexus: NexusSpec, + context: &PollContext, + mode: OperationMode, + ) -> PollResult { + let warn_missing = |nexus_spec: &NexusSpec, node_status: NodeStatus| { + nexus_spec.debug_span(|| { + tracing::debug!( + node.uuid = %nexus_spec.node, + node.status = %node_status.to_string(), + "Attempted to recreate missing nexus, but the node is not online" + ) + }); + }; + + let node = match context.registry().get_node_wrapper(&nexus.node).await { + Ok(node) if !node.read().await.is_online() => { + let node_status = node.read().await.status(); + warn_missing(&nexus, node_status); + return PollResult::Ok(PollerState::Idle); + } + Err(_) => { + warn_missing(&nexus, NodeStatus::Unknown); + return PollResult::Ok(PollerState::Idle); + } + Ok(node) => node, + }; - nexus.warn_span(|| tracing::warn!("Attempting to recreate missing nexus")); + nexus.warn_span(|| tracing::warn!("Attempting to recreate missing nexus")); - let children = get_healthy_nexus_children(&nexus, context.registry()).await?; + let children = get_healthy_nexus_children(&nexus, context.registry()).await?; - let mut nexus_replicas = vec![]; - for item in children.candidates() { - // just in case the replica gets somehow shared/unshared? - match context - .specs() - .make_replica_accessible(context.registry(), item.state(), &nexus.node, mode) - .await - { - Ok(uri) => { - nexus_replicas.push(NexusChild::Replica(ReplicaUri::new( - &item.spec().uuid, - &uri, - ))); - } - Err(error) => { - nexus.error_span(|| { - tracing::error!(nexus.node=%nexus.node, replica.uuid = %item.spec().uuid, error=%error, "Failed to make the replica available on the nexus node"); - }); + let mut nexus_replicas = vec![]; + for item in children.candidates() { + // just in case the replica gets somehow shared/unshared? + match context + .specs() + .make_replica_accessible(context.registry(), item.state(), &nexus.node, mode) + .await + { + Ok(uri) => { + nexus_replicas.push(NexusChild::Replica(ReplicaUri::new( + &item.spec().uuid, + &uri, + ))); + } + Err(error) => { + nexus.error_span(|| { + tracing::error!(nexus.node=%nexus.node, replica.uuid = %item.spec().uuid, error=%error, "Failed to make the replica available on the nexus node"); + }); + } } } - } - nexus.children = match children { - HealthyChildItems::One(_) => nexus_replicas.first().into_iter().cloned().collect(), - HealthyChildItems::All(_) => nexus_replicas, - }; + nexus.children = match children { + HealthyChildItems::One(_, _) => nexus_replicas.first().into_iter().cloned().collect(), + HealthyChildItems::All(_, _) => nexus_replicas, + }; - if nexus.children.is_empty() { - nexus.warn_span(|| tracing::warn!("No nexus children are available. Will retry later...")); - return PollResult::Ok(PollerState::Idle); - } + if nexus.children.is_empty() { + if let Some(info) = children.nexus_info() { + if info.no_healthy_replicas() { + nexus.error_span(|| { + tracing::error!("No healthy replicas found - manual intervention required") + }); + return PollResult::Ok(PollerState::Idle); + } + } - match node.create_nexus(&CreateNexus::from(&nexus)).await { - Ok(_) => { - nexus.info_span(|| tracing::info!("Nexus successfully recreated")); - PollResult::Ok(PollerState::Idle) + nexus.warn_span(|| { + tracing::warn!("No nexus children are available. Will retry later...") + }); + return PollResult::Ok(PollerState::Idle); } - Err(error) => { - nexus.error_span(|| tracing::error!(error=%error, "Failed to recreate the nexus")); - Err(error) + + match node.create_nexus(&CreateNexus::from(&nexus)).await { + Ok(_) => { + nexus.info_span(|| tracing::info!("Nexus successfully recreated")); + PollResult::Ok(PollerState::Idle) + } + Err(error) => { + nexus.error_span(|| tracing::error!(error=%error, "Failed to recreate the nexus")); + Err(error) + } } } + + missing_nexus_recreate(nexus, context, mode).await } /// Fixup the nexus share protocol if it does not match what the specs says /// If the nexus is shared but the protocol is not the same as the spec, then we must first /// unshare the nexus, and then share it via the correct protocol -#[tracing::instrument(skip(nexus_spec, context), fields(nexus.uuid = %nexus_spec.lock().uuid))] +#[tracing::instrument(skip(nexus_spec, context), level = "debug", fields(nexus.uuid = %nexus_spec.lock().uuid, request.reconcile = true))] pub(super) async fn fixup_nexus_protocol( nexus_spec: &Arc>, context: &PollContext, @@ -358,3 +411,54 @@ pub(super) async fn fixup_nexus_protocol( PollResult::Ok(PollerState::Idle) } + +/// Given a published self-healing volume +/// When its nexus target is faulted +/// And one or more of its healthy replicas are back online +/// Then the nexus shall be removed from its associated node +pub(super) async fn faulted_nexus_remover( + nexus_spec: &Arc>, + context: &PollContext, + _mode: OperationMode, +) -> PollResult { + let nexus = nexus_spec.lock().clone(); + let nexus_uuid = nexus.uuid.clone(); + + if let Ok(nexus_state) = context.registry().get_nexus(&nexus_uuid).await { + if nexus_state.status == NexusStatus::Faulted { + let healthy_children = get_healthy_nexus_children(&nexus, context.registry()).await?; + let node = context.registry().get_node_wrapper(&nexus.node).await?; + + #[tracing::instrument(skip(nexus, node), fields(nexus.uuid = %nexus.uuid, request.reconcile = true))] + async fn faulted_nexus_remover( + nexus: NexusSpec, + node: Arc>, + ) -> PollResult { + nexus.warn( + "Removing Faulted Nexus so it can be recreated with its healthy children", + ); + + // destroy the nexus - it will be recreated by the missing_nexus reconciler! + match node.destroy_nexus(&nexus.clone().into()).await { + Ok(_) => { + nexus.info("Faulted Nexus successfully removed"); + } + Err(error) => { + nexus.info_span(|| tracing::error!(error=%error.full_string(), "Failed to remove Faulted Nexus")); + return Err(error); + } + } + + PollResult::Ok(PollerState::Idle) + } + + let node_online = node.read().await.is_online(); + // only remove the faulted nexus when the children are available again + if node_online && !healthy_children.candidates().is_empty() { + faulted_nexus_remover(nexus, node).await?; + } + } + } + + PollResult::Ok(PollerState::Idle) +} diff --git a/control-plane/agents/core/src/core/reconciler/poller.rs b/control-plane/agents/core/src/core/reconciler/poller.rs index 1d7c81052..63405ab78 100644 --- a/control-plane/agents/core/src/core/reconciler/poller.rs +++ b/control-plane/agents/core/src/core/reconciler/poller.rs @@ -34,7 +34,9 @@ impl ReconcilerWorker { Box::new(replica::ReplicaReconciler::new()), ]; - let event_channel = tokio::sync::mpsc::channel(poll_targets.len()); + // if events are sent before the worker is started they may fill up the buffer + // from which point messages will be dropped + let event_channel = tokio::sync::mpsc::channel(poll_targets.len() * 2); let shutdown_channel = tokio::sync::mpsc::channel(1); Self { poll_targets, diff --git a/control-plane/agents/core/src/core/reconciler/pool/mod.rs b/control-plane/agents/core/src/core/reconciler/pool/mod.rs index 08388625e..a74c51d5d 100644 --- a/control-plane/agents/core/src/core/reconciler/pool/mod.rs +++ b/control-plane/agents/core/src/core/reconciler/pool/mod.rs @@ -9,6 +9,7 @@ use common_lib::types::v0::{ }; use parking_lot::Mutex; use std::sync::Arc; +use tracing_futures::Instrument; /// Pool Reconciler loop which: /// 1. recreates pools which are not present following a mayastor restart @@ -51,7 +52,7 @@ impl TaskPoller for PoolReconciler { /// crashed/restarted. /// In such a case, we issue a new create pool request against the mayastor instance where the pool /// should exist. -#[tracing::instrument(skip(pool_spec, context), fields(pool.uuid = %pool_spec.lock().id))] +#[tracing::instrument(skip(pool_spec, context), level = "trace", fields(pool.uuid = %pool_spec.lock().id, request.reconcile = true))] async fn missing_pool_state_reconciler( pool_spec: Arc>, context: &PollContext, @@ -81,7 +82,7 @@ async fn missing_pool_state_reconciler( }; let node = match context.registry().get_node_wrapper(&pool.node).await { Ok(node) if !node.read().await.is_online() => { - let node_status = node.read().await.status().clone(); + let node_status = node.read().await.status(); warn_missing(&pool_spec, node_status); return PollResult::Ok(PollerState::Idle); } @@ -92,19 +93,25 @@ async fn missing_pool_state_reconciler( Ok(node) => node, }; - pool.warn_span(|| tracing::warn!("Attempting to recreate missing pool")); + async { + pool.warn_span(|| tracing::warn!("Attempting to recreate missing pool")); - let request = CreatePool::new(&pool.node, &pool.id, &pool.disks, &pool.labels); - match node.create_pool(&request).await { - Ok(_) => { - pool.info_span(|| tracing::info!("Pool successfully recreated")); - PollResult::Ok(PollerState::Idle) - } - Err(error) => { - pool.error_span(|| tracing::error!(error=%error, "Failed to recreate the pool")); - Err(error) + let request = CreatePool::new(&pool.node, &pool.id, &pool.disks, &pool.labels); + match node.create_pool(&request).await { + Ok(_) => { + pool.info_span(|| tracing::info!("Pool successfully recreated")); + PollResult::Ok(PollerState::Idle) + } + Err(error) => { + pool.error_span( + || tracing::error!(error=%error, "Failed to recreate the pool"), + ); + Err(error) + } } } + .instrument(tracing::info_span!("missing_pool_state_reconciler", pool.uuid = %pool.id, request.reconcile = true)) + .await } else { PollResult::Ok(PollerState::Idle) } @@ -114,7 +121,7 @@ async fn missing_pool_state_reconciler( /// the pool deletion gets struck in Deleting state, this creates a problem as when /// the node comes up we cannot create a pool with same specs, the deleting_pool_spec_reconciler /// cleans up any such pool when node comes up. -#[tracing::instrument(skip(pool_spec, context), fields(pool.uuid = %pool_spec.lock().id))] +#[tracing::instrument(skip(pool_spec, context), level = "trace", fields(pool.uuid = %pool_spec.lock().id, request.reconcile = true))] async fn deleting_pool_spec_reconciler( pool_spec: Arc>, context: &PollContext, @@ -123,6 +130,7 @@ async fn deleting_pool_spec_reconciler( // nothing to do here return PollResult::Ok(PollerState::Idle); } + let pool = pool_spec.lock().clone(); match context .registry() @@ -136,22 +144,27 @@ async fn deleting_pool_spec_reconciler( } Err(_) => return PollResult::Ok(PollerState::Idle), }; - let request = DestroyPool { - node: pool.node.clone(), - id: pool.id.clone(), - }; - match context - .specs() - .destroy_pool(context.registry(), &request, OperationMode::Exclusive) - .await - { - Ok(_) => { - pool.info_span(|| tracing::info!("Pool deleted successfully")); - PollResult::Ok(PollerState::Idle) - } - Err(error) => { - pool.error_span(|| tracing::error!(error=%error, "Failed to delete the pool")); - Err(error) + + async { + let request = DestroyPool { + node: pool.node.clone(), + id: pool.id.clone(), + }; + match context + .specs() + .destroy_pool(context.registry(), &request, OperationMode::Exclusive) + .await + { + Ok(_) => { + pool.info_span(|| tracing::info!("Pool deleted successfully")); + PollResult::Ok(PollerState::Idle) + } + Err(error) => { + pool.error_span(|| tracing::error!(error=%error, "Failed to delete the pool")); + Err(error) + } } } + .instrument(tracing::info_span!("deleting_pool_spec_reconciler", pool.uuid = %pool.id, request.reconcile = true)) + .await } diff --git a/control-plane/agents/core/src/core/reconciler/volume/garbage_collector.rs b/control-plane/agents/core/src/core/reconciler/volume/garbage_collector.rs index 01d3c23b4..eca8e6017 100644 --- a/control-plane/agents/core/src/core/reconciler/volume/garbage_collector.rs +++ b/control-plane/agents/core/src/core/reconciler/volume/garbage_collector.rs @@ -8,7 +8,10 @@ use common_lib::types::v0::store::{volume::VolumeSpec, OperationMode, TraceSpan, use crate::core::specs::SpecOperations; use common::errors::SvcError; -use common_lib::types::v0::store::{nexus_persistence::NexusInfo, replica::ReplicaSpec}; +use common_lib::types::v0::{ + message_bus::VolumeStatus, + store::{nexus_persistence::NexusInfo, replica::ReplicaSpec}, +}; use parking_lot::Mutex; use std::sync::Arc; @@ -119,20 +122,35 @@ async fn disown_unused_replicas( // don't attempt to disown the replicas if the nexus that should own them is not stable return PollResult::Ok(PollerState::Busy); } + + let volume_state = context + .registry() + .get_volume_state(&volume_clone.uuid) + .await?; + if matches!( + volume_state.status, + VolumeStatus::Faulted | VolumeStatus::Unknown + ) { + // don't attempt to disown the replicas if the volume state is faulted or unknown + return PollResult::Ok(PollerState::Busy); + } + let mut nexus_info = None; // defer reading from the persistent store unless we find a candidate let mut results = vec![]; for replica in context.specs().get_volume_replicas(&volume_clone.uuid) { let _guard = match replica.operation_guard(OperationMode::ReconcileStart) { Ok(guard) => guard, - Err(_) => return PollResult::Ok(PollerState::Busy), + Err(_) => continue, }; let replica_clone = replica.lock().clone(); + let replica_in_target = target.lock().contains_replica(&replica_clone.uuid); let replica_online = matches!(context.registry().get_replica(&replica_clone.uuid).await, Ok(state) if state.online()); if !replica_online && replica_clone.owners.owned_by(&volume_clone.uuid) && !replica_clone.owners.owned_by_a_nexus() + && !replica_in_target && !is_replica_healthy(context, &mut nexus_info, &replica_clone, &volume_clone).await? { volume_clone.warn_span(|| tracing::warn!(replica.uuid = %replica_clone.uuid, "Attempting to disown unused replica")); @@ -181,7 +199,13 @@ async fn is_replica_healthy( } Some(info) => info, }; - Ok(info.is_replica_healthy(&replica_spec.uuid)) + if info.no_healthy_replicas() { + Err(SvcError::NoHealthyReplicas { + id: volume_spec.uuid(), + }) + } else { + Ok(info.is_replica_healthy(&replica_spec.uuid)) + } } #[cfg(test)] diff --git a/control-plane/agents/core/src/core/reconciler/volume/hot_spare.rs b/control-plane/agents/core/src/core/reconciler/volume/hot_spare.rs index 1c1b4203a..bceb67b11 100644 --- a/control-plane/agents/core/src/core/reconciler/volume/hot_spare.rs +++ b/control-plane/agents/core/src/core/reconciler/volume/hot_spare.rs @@ -13,7 +13,10 @@ use common_lib::{ }, }; -use common_lib::types::v0::store::{TraceSpan, TraceStrLog}; +use common_lib::types::v0::{ + message_bus::Nexus, + store::{TraceSpan, TraceStrLog}, +}; use parking_lot::Mutex; use snafu::OptionExt; use std::{cmp::Ordering, sync::Arc}; @@ -154,7 +157,6 @@ async fn missing_children_remover( /// Given a degraded volume /// When the nexus spec has a different number of children to the number of volume replicas /// Then the nexus spec should eventually have as many children as the number of volume replicas -#[tracing::instrument(skip(context, volume_spec, nexus_spec, mode), fields(nexus.uuid = %nexus_spec.lock().uuid, request.reconcile = true))] async fn nexus_replica_count_reconciler( volume_spec: &Arc>, nexus_spec: &Arc>, @@ -181,6 +183,34 @@ async fn nexus_replica_count_reconciler( counter }); + match nexus_replica_children.cmp(&volume_replicas) { + Ordering::Less | Ordering::Greater => { + nexus_replica_count_reconciler_traced( + volume_spec, + nexus_spec, + nexus_state, + nexus_replica_children, + context, + mode, + ) + .await + } + Ordering::Equal => PollResult::Ok(PollerState::Idle), + } +} +#[tracing::instrument(skip(context, volume_spec, nexus_spec, mode), fields(nexus.uuid = %nexus_spec.lock().uuid, request.reconcile = true))] +async fn nexus_replica_count_reconciler_traced( + volume_spec: &Arc>, + nexus_spec: &Arc>, + nexus_state: Nexus, + nexus_replica_children: usize, + context: &PollContext, + mode: OperationMode, +) -> PollResult { + let vol_spec_clone = volume_spec.lock().clone(); + let nexus_spec_clone = nexus_spec.lock().clone(); + let volume_replicas = vol_spec_clone.num_replicas as usize; + match nexus_replica_children.cmp(&volume_replicas) { Ordering::Less => { nexus_spec_clone.warn_span(|| { @@ -234,7 +264,6 @@ async fn nexus_replica_count_reconciler( /// When the number of created volume replicas is different to the required number of replicas /// Then the number of created volume replicas should eventually match the required number of /// replicas -#[tracing::instrument(level = "debug", skip(context, volume_spec), fields(volume.uuid = %volume_spec.lock().uuid, request.reconcile = true))] async fn volume_replica_count_reconciler( volume_spec: &Arc>, context: &PollContext, @@ -244,6 +273,27 @@ async fn volume_replica_count_reconciler( let volume_uuid = volume_spec_clone.uuid.clone(); let required_replica_count = volume_spec_clone.num_replicas as usize; + let current_replicas = context.specs().get_volume_replicas(&volume_uuid); + let current_replica_count = current_replicas.len(); + + match current_replica_count.cmp(&required_replica_count) { + Ordering::Less | Ordering::Greater => { + volume_replica_count_reconciler_traced(volume_spec, context, mode).await + } + Ordering::Equal => PollResult::Ok(PollerState::Idle), + } +} + +#[tracing::instrument(skip(context, volume_spec, mode), fields(volume.uuid = %volume_spec.lock().uuid, request.reconcile = true))] +async fn volume_replica_count_reconciler_traced( + volume_spec: &Arc>, + context: &PollContext, + mode: OperationMode, +) -> PollResult { + let volume_spec_clone = volume_spec.lock().clone(); + let volume_uuid = volume_spec_clone.uuid.clone(); + let required_replica_count = volume_spec_clone.num_replicas as usize; + let current_replicas = context.specs().get_volume_replicas(&volume_uuid); let mut current_replica_count = current_replicas.len(); diff --git a/control-plane/agents/core/src/core/reconciler/volume/nexus.rs b/control-plane/agents/core/src/core/reconciler/volume/nexus.rs index 937b3ad4e..4e5c48968 100644 --- a/control-plane/agents/core/src/core/reconciler/volume/nexus.rs +++ b/control-plane/agents/core/src/core/reconciler/volume/nexus.rs @@ -9,6 +9,8 @@ use crate::core::{ use common_lib::types::v0::store::{volume::VolumeSpec, OperationMode}; +use crate::core::reconciler::nexus::faulted_nexus_remover; +use common_lib::types::v0::message_bus::VolumeStatus; use parking_lot::Mutex; use std::sync::Arc; @@ -36,7 +38,7 @@ impl TaskPoller for VolumeNexusReconciler { } } -#[tracing::instrument(level = "debug", skip(context, volume_spec), fields(volume.uuid = %volume_spec.lock().uuid, request.reconcile = true))] +#[tracing::instrument(level = "trace", skip(context, volume_spec), fields(volume.uuid = %volume_spec.lock().uuid, request.reconcile = true))] async fn volume_nexus_reconcile( volume_spec: &Arc>, context: &PollContext, @@ -63,7 +65,12 @@ async fn volume_nexus_reconcile( return PollResult::Ok(PollerState::Idle); } - missing_nexus_recreate(&nexus_spec, context, mode).await?; + let volume_state = context.registry().get_volume_state(&volume.uuid).await?; + + if volume_state.status != VolumeStatus::Online { + faulted_nexus_remover(&nexus_spec, context, mode).await?; + missing_nexus_recreate(&nexus_spec, context, mode).await?; + } fixup_nexus_protocol(&nexus_spec, context, mode).await } None => PollResult::Ok(PollerState::Idle), diff --git a/control-plane/agents/core/src/core/registry.rs b/control-plane/agents/core/src/core/registry.rs index 49135e909..efd94d637 100644 --- a/control-plane/agents/core/src/core/registry.rs +++ b/control-plane/agents/core/src/core/registry.rs @@ -260,17 +260,19 @@ impl Registry { /// Poll each node for resource updates async fn poller(&self) { loop { - let nodes = self.nodes().read().await.clone(); - for (_, node) in nodes.iter() { - let lock = node.grpc_lock().await; - let _guard = lock.lock().await; - - let mut node_clone = node.write().await.clone(); - if let Err(e) = node_clone.reload().await { - tracing::trace!("Failed to reload node {}. Error {:?}.", node_clone.id, e); + { + let nodes = self.nodes().read().await; + for (_, node) in nodes.iter() { + let (id, online) = { + let node = node.read().await; + (node.id().clone(), node.is_online()) + }; + if online { + if let Err(error) = node.update_all(false).await { + tracing::error!(node = %id, error = %error, "Failed to reload node"); + } + } } - // update node in the registry - *node.write().await = node_clone; } tokio::time::sleep(self.cache_period).await; } diff --git a/control-plane/agents/core/src/core/scheduling/resources/mod.rs b/control-plane/agents/core/src/core/scheduling/resources/mod.rs index 7edf69a73..262aee6be 100644 --- a/control-plane/agents/core/src/core/scheduling/resources/mod.rs +++ b/control-plane/agents/core/src/core/scheduling/resources/mod.rs @@ -4,7 +4,11 @@ use crate::core::{ }; use common_lib::types::v0::{ message_bus::{Child, ChildUri, Replica}, - store::{nexus_child::NexusChild, nexus_persistence::ChildInfo, replica::ReplicaSpec}, + store::{ + nexus_child::NexusChild, + nexus_persistence::{ChildInfo, NexusInfo}, + replica::ReplicaSpec, + }, }; #[derive(Debug, Clone)] @@ -119,23 +123,30 @@ pub(crate) struct ChildItem { #[derive(Debug, Clone)] pub(crate) enum HealthyChildItems { /// One with multiple healthy candidates - One(Vec), + One(Option, Vec), /// All the healthy replicas can be used - All(Vec), + All(Option, Vec), } impl HealthyChildItems { /// Check if there are no healthy children pub(crate) fn is_empty(&self) -> bool { match self { - HealthyChildItems::One(items) => items.is_empty(), - HealthyChildItems::All(items) => items.is_empty(), + HealthyChildItems::One(_, items) => items.is_empty(), + HealthyChildItems::All(_, items) => items.is_empty(), } } /// Get a reference to the list of candidates pub(crate) fn candidates(&self) -> &Vec { match self { - HealthyChildItems::One(items) => items, - HealthyChildItems::All(items) => items, + HealthyChildItems::One(_, items) => items, + HealthyChildItems::All(_, items) => items, + } + } + /// Get a reference to the list of candidates + pub(crate) fn nexus_info(&self) -> &Option { + match self { + HealthyChildItems::One(info, _) => info, + HealthyChildItems::All(info, _) => info, } } } diff --git a/control-plane/agents/core/src/core/scheduling/volume.rs b/control-plane/agents/core/src/core/scheduling/volume.rs index de024a3f5..18005ba95 100644 --- a/control-plane/agents/core/src/core/scheduling/volume.rs +++ b/control-plane/agents/core/src/core/scheduling/volume.rs @@ -169,7 +169,7 @@ impl GetChildForRemoval { /// Used to filter nexus children in order to choose the best candidates for removal /// when the volume's replica count is being reduced. -#[derive(Debug, Clone)] +#[derive(Clone)] pub(crate) struct GetChildForRemovalContext { registry: Registry, spec: VolumeSpec, @@ -177,6 +177,16 @@ pub(crate) struct GetChildForRemovalContext { nexus_info: Option, unused_only: bool, } +impl std::fmt::Debug for GetChildForRemovalContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("GetChildForRemovalContext") + .field("spec", &self.spec) + .field("state", &self.state) + .field("nexus_info", &self.nexus_info) + .field("unused_only", &self.unused_only) + .finish() + } +} impl GetChildForRemovalContext { async fn new(registry: &Registry, request: &GetChildForRemoval) -> Result { diff --git a/control-plane/agents/core/src/core/specs.rs b/control-plane/agents/core/src/core/specs.rs index 4d4ee80d5..2f1a338e6 100644 --- a/control-plane/agents/core/src/core/specs.rs +++ b/control-plane/agents/core/src/core/specs.rs @@ -667,6 +667,17 @@ impl ResourceSpecsLocked { ); } } + + // patch up the missing replica nexus owners + let nexuses = self.get_nexuses(); + for replica in self.get_replicas() { + let replica_uuid = replica.lock().uuid.clone(); + + nexuses + .iter() + .filter(|n| n.lock().contains_replica(&replica_uuid)) + .for_each(|n| replica.lock().owners.add_owner(&n.lock().uuid)); + } } /// Deserialise a vector of serde_json values into specific spec types. diff --git a/control-plane/agents/core/src/core/wrapper.rs b/control-plane/agents/core/src/core/wrapper.rs index ca574dddb..415a08628 100644 --- a/control-plane/agents/core/src/core/wrapper.rs +++ b/control-plane/agents/core/src/core/wrapper.rs @@ -1,20 +1,42 @@ use super::{super::node::watchdog::Watchdog, grpc::GrpcContext}; +use crate::{ + core::{ + grpc::{GrpcClient, GrpcClientLocked}, + states::{ResourceStates, ResourceStatesLocked}, + }, + node::service::NodeCommsTimeout, +}; + use common::{ errors::{GrpcRequestError, SvcError}, v0::msg_translation::{MessageBusToRpc, RpcToMessageBus, TryRpcToMessageBus}, }; use common_lib::{ - mbus_api::ResourceKind, - types::v0::message_bus::{ - AddNexusChild, Child, CreateNexus, CreatePool, CreateReplica, DestroyNexus, DestroyPool, - DestroyReplica, Nexus, NexusId, NodeId, NodeState, NodeStatus, PoolId, PoolState, - PoolStatus, Protocol, RemoveNexusChild, Replica, ReplicaId, ShareNexus, ShareReplica, - UnshareNexus, UnshareReplica, + mbus_api::{Message, MessageId, MessageIdTimeout, ResourceKind}, + types::v0::{ + message_bus::{ + AddNexusChild, Child, CreateNexus, CreatePool, CreateReplica, DestroyNexus, + DestroyPool, DestroyReplica, MessageIdVs, Nexus, NexusId, NodeId, NodeState, + NodeStatus, PoolId, PoolState, PoolStatus, Protocol, RemoveNexusChild, Replica, + ReplicaId, ShareNexus, ShareReplica, UnshareNexus, UnshareReplica, + }, + store, + store::{nexus::NexusState, replica::ReplicaState}, }, }; + +use async_trait::async_trait; use rpc::mayastor::Null; use snafu::ResultExt; -use std::cmp::Ordering; +use std::{ + cmp::Ordering, + ops::{Deref, DerefMut}, + sync::Arc, +}; + +type NodeResourceStates = (Vec, Vec, Vec); +/// Default timeout for GET* gRPC requests (ex: GetPools, GetNexuses, etc..) +const GETS_TIMEOUT: MessageIdVs = MessageIdVs::Default; /// Wrapper over a `Node` plus a few useful methods/properties. Includes: /// all pools and replicas from the node @@ -75,8 +97,8 @@ impl NodeWrapper { ) -> Result { GrpcContext::new( self.lock.clone(), - &self.id, - &self.node_state.grpc_endpoint, + self.id(), + &self.endpoint_str(), &self.comms_timeouts, Some(request), ) @@ -89,8 +111,8 @@ impl NodeWrapper { ) -> Result { GrpcContext::new( self.lock.clone(), - &self.id, - &self.node_state.grpc_endpoint, + self.id(), + &self.endpoint_str(), &timeout, None::, ) @@ -100,8 +122,8 @@ impl NodeWrapper { pub(crate) fn grpc_context(&self) -> Result { GrpcContext::new( self.lock.clone(), - &self.id, - &self.node_state.grpc_endpoint, + self.id(), + &self.endpoint_str(), &self.comms_timeouts, None::, ) @@ -112,17 +134,13 @@ impl NodeWrapper { self.watchdog.timestamp().elapsed() > self.watchdog.deadline() } - /// On_register callback when the node is registered with the registry - pub(crate) async fn on_register(&mut self) { + /// "Pet" the node to meet the node's watchdog timer deadline + pub(crate) async fn pet(&mut self) { self.watchdog.pet().await.ok(); if self.missed_deadline { - tracing::info!(node.uuid=%self.id, "The node had missed the heartbeat deadline but it's now re-registered itself"); + tracing::info!(node.uuid=%self.id(), "The node had missed the heartbeat deadline but it's now re-registered itself"); } self.missed_deadline = false; - if self.set_status(NodeStatus::Online) != NodeStatus::Online { - // if a node reappears as online, then reload its information - self.reload().await.ok(); - } } /// Update the node liveness if the watchdog's registration expired @@ -132,7 +150,7 @@ impl NodeWrapper { if !self.missed_deadline { tracing::error!( "Node id '{}' missed the registration deadline of {:?}", - self.id, + self.id(), self.watchdog.deadline() ); } @@ -142,13 +160,13 @@ impl NodeWrapper { && self.watchdog.pet().await.is_ok() { if !self.missed_deadline { - tracing::warn!(node.uuid=%self.id, "The node missed the heartbeat deadline but it's still responding to gRPC so we're considering it online"); + tracing::warn!(node.uuid=%self.id(), "The node missed the heartbeat deadline but it's still responding to gRPC so we're considering it online"); } } else { if self.missed_deadline { tracing::error!( "Node id '{}' missed the registration deadline of {:?}", - self.id, + self.id(), self.watchdog.deadline() ); } @@ -166,38 +184,38 @@ impl NodeWrapper { let mut ctx = self.grpc_client_timeout(timeouts).await?; let _ = ctx - .client + .mayastor .get_mayastor_info(rpc::mayastor::Null {}) .await .map_err(|_| SvcError::NodeNotOnline { - node: self.id.to_owned(), + node: self.id().to_owned(), })?; Ok(()) } /// Set the node status and return the previous status - pub(crate) fn set_status(&mut self, state: NodeStatus) -> NodeStatus { - let previous = self.status.clone(); - if self.node_state.status != state { - if state == NodeStatus::Online { + pub(crate) fn set_status(&mut self, next: NodeStatus) -> NodeStatus { + let previous = self.status(); + if previous != next { + if next == NodeStatus::Online { tracing::info!( "Node '{}' changing from {} to {}", - self.node_state.id, - self.node_state.status.to_string(), - state.to_string(), + self.id(), + previous.to_string(), + next.to_string(), ); } else { tracing::warn!( "Node '{}' changing from {} to {}", - self.node_state.id, - self.node_state.status.to_string(), - state.to_string(), + self.id(), + previous.to_string(), + next.to_string(), ); } - self.node_state.status = state; + self.node_state.status = next; if self.node_state.status == NodeStatus::Unknown { - self.watchdog.disarm() + self.watchdog_mut().disarm() } } // Clear the states, otherwise we could temporarily return pools/nexus as online, even @@ -211,7 +229,17 @@ impl NodeWrapper { /// Clear all states from the node fn clear_states(&mut self) { - self.states.write().clear_all(); + self.resources_mut().clear_all(); + } + + /// Get the inner states + fn resources(&self) -> parking_lot::RwLockReadGuard { + self.states.read() + } + + /// Get the inner resource states + fn resources_mut(&self) -> parking_lot::RwLockWriteGuard { + self.states.write() } /// Get a mutable reference to the node's watchdog @@ -222,10 +250,22 @@ impl NodeWrapper { pub(crate) fn node_state(&self) -> &NodeState { &self.node_state } + /// Get the node `NodeId` + pub(crate) fn id(&self) -> &NodeId { + self.node_state().id() + } + /// Get the node `NodeStatus` + pub(crate) fn status(&self) -> NodeStatus { + self.node_state().status().clone() + } + + /// Get the node grpc endpoint as string. + pub(crate) fn endpoint_str(&self) -> String { + self.node_state().grpc_endpoint.clone() + } /// Get all pools pub(crate) fn pools(&self) -> Vec { - self.states - .read() + self.resources() .get_pool_states() .iter() .map(|p| p.pool.clone()) @@ -233,9 +273,8 @@ impl NodeWrapper { } /// Get all pool wrappers pub(crate) fn pool_wrappers(&self) -> Vec { - let state = self.states.read(); - let pools = state.get_pool_states(); - let replicas = state.get_replica_states(); + let pools = self.resources().get_pool_states(); + let replicas = self.resources().get_replica_states(); pools .into_iter() .map(|p| { @@ -250,15 +289,15 @@ impl NodeWrapper { } /// Get all pool states pub(crate) fn pool_states(&self) -> Vec { - self.states.read().get_pool_states() + self.resources().get_pool_states() } /// Get pool from `pool_id` or None pub(crate) fn pool(&self, pool_id: &PoolId) -> Option { - self.states.read().get_pool_state(pool_id).map(|p| p.pool) + self.resources().get_pool_state(pool_id).map(|p| p.pool) } /// Get a PoolWrapper for the pool ID. pub(crate) fn pool_wrapper(&self, pool_id: &PoolId) -> Option { - let r = self.states.read(); + let r = self.resources(); match r.get_pool_states().iter().find(|p| &p.pool.id == pool_id) { Some(pool_state) => { let replicas: Vec = self @@ -273,8 +312,7 @@ impl NodeWrapper { } /// Get all replicas pub(crate) fn replicas(&self) -> Vec { - self.states - .read() + self.resources() .get_replica_states() .iter() .map(|r| r.replica.clone()) @@ -282,12 +320,11 @@ impl NodeWrapper { } /// Get all replica states pub(crate) fn replica_states(&self) -> Vec { - self.states.read().get_replica_states() + self.resources().get_replica_states() } /// Get all nexuses fn nexuses(&self) -> Vec { - self.states - .read() + self.resources() .get_nexus_states() .iter() .map(|nexus_state| nexus_state.nexus.clone()) @@ -295,47 +332,44 @@ impl NodeWrapper { } /// Get all nexus states pub(crate) fn nexus_states(&self) -> Vec { - self.states.read().get_nexus_states() + self.resources().get_nexus_states() } /// Get nexus fn nexus(&self, nexus_id: &NexusId) -> Option { - self.states - .read() - .get_nexus_state(nexus_id) - .map(|s| s.nexus) + self.resources().get_nexus_state(nexus_id).map(|s| s.nexus) } /// Get replica from `replica_id` pub(crate) fn replica(&self, replica_id: &ReplicaId) -> Option { - self.states - .read() + self.resources() .get_replica_state(replica_id) .map(|r| r.replica) } /// Is the node online pub(crate) fn is_online(&self) -> bool { - self.node_state.status == NodeStatus::Online + self.status() == NodeStatus::Online } /// Load the node by fetching information from mayastor pub(crate) async fn load(&mut self) -> Result<(), SvcError> { tracing::info!( "Preloading node '{}' on endpoint '{}'", - self.id, - self.grpc_endpoint + self.id(), + self.endpoint_str() ); - match self.fetch_resources().await { + let mut client = self.grpc_client().await?; + match self.fetch_resources(&mut client).await { Ok((replicas, pools, nexuses)) => { - let mut states = self.states.write(); + let mut states = self.resources_mut(); states.update(pools, replicas, nexuses); Ok(()) } Err(error) => { - self.node_state.status = NodeStatus::Unknown; + self.set_status(NodeStatus::Unknown); tracing::error!( "Preloading of node '{}' on endpoint '{}' failed with error: {:?}", - self.id, - self.grpc_endpoint, + self.id(), + self.endpoint_str(), error ); Err(error) @@ -343,36 +377,45 @@ impl NodeWrapper { } } - /// Reload the node by fetching information from mayastor - pub(crate) async fn reload(&mut self) -> Result<(), SvcError> { - if self.is_online() { + /// Update the node by updating its state from the states fetched from mayastor + fn update( + &mut self, + setting_online: bool, + fetch_result: Result, + ) -> Result<(), SvcError> { + if self.is_online() || setting_online { tracing::trace!( "Reloading node '{}' on endpoint '{}'", - self.id, - self.grpc_endpoint + self.id(), + self.endpoint_str() ); - match self.fetch_resources().await { + match fetch_result { Ok((replicas, pools, nexuses)) => { - let mut states = self.states.write(); - states.update(pools, replicas, nexuses); + self.resources_mut().update(pools, replicas, nexuses); + if setting_online { + // we only set it as online after we've updated the resource states + // so an online node should be "up-to-date" + self.set_status(NodeStatus::Online); + } Ok(()) } - Err(e) => { + Err(error) => { self.set_status(NodeStatus::Unknown); - Err(e) + tracing::trace!("Failed to reload node {}. Error {:?}.", self.id(), error); + Err(error) } } } else { tracing::trace!( "Skipping reload of node '{}' since it's '{:?}'", - self.id, - self.status + self.id(), + self.node_state() ); // should already be cleared self.clear_states(); Err(SvcError::NodeNotOnline { - node: self.id.to_owned(), + node: self.id().to_owned(), }) } } @@ -380,43 +423,49 @@ impl NodeWrapper { /// Fetch the various resources from Mayastor. async fn fetch_resources( &self, - ) -> Result<(Vec, Vec, Vec), SvcError> { - let replicas = self.fetch_replicas().await?; - let pools = self.fetch_pools().await?; - let nexuses = self.fetch_nexuses().await?; + client: &mut GrpcClient, + ) -> Result { + let replicas = self.fetch_replicas(client).await?; + let pools = self.fetch_pools(client).await?; + let nexuses = self.fetch_nexuses(client).await?; Ok((replicas, pools, nexuses)) } /// Fetch all replicas from this node via gRPC - pub(crate) async fn fetch_replicas(&self) -> Result, SvcError> { - let mut ctx = self.grpc_client().await?; + pub(crate) async fn fetch_replicas( + &self, + client: &mut GrpcClient, + ) -> Result, SvcError> { let rpc_replicas = - ctx.client + client + .mayastor .list_replicas_v2(Null {}) .await .context(GrpcRequestError { resource: ResourceKind::Replica, request: "list_replicas", })?; + let rpc_replicas = &rpc_replicas.get_ref().replicas; let pools = rpc_replicas .iter() - .map(|p| match rpc_replica_to_bus(p, &self.id) { + .filter_map(|p| match rpc_replica_to_bus(p, self.id()) { Ok(r) => Some(r), Err(error) => { tracing::error!(error=%error, "Could not convert rpc replica"); None } }) - .flatten() .collect(); Ok(pools) } /// Fetch all pools from this node via gRPC - pub(crate) async fn fetch_pools(&self) -> Result, SvcError> { - let mut ctx = self.grpc_client().await?; - let rpc_pools = ctx - .client + pub(crate) async fn fetch_pools( + &self, + client: &mut GrpcClient, + ) -> Result, SvcError> { + let rpc_pools = client + .mayastor .list_pools(Null {}) .await .context(GrpcRequestError { @@ -426,84 +475,71 @@ impl NodeWrapper { let rpc_pools = &rpc_pools.get_ref().pools; let pools = rpc_pools .iter() - .map(|p| rpc_pool_to_bus(p, &self.id)) + .map(|p| rpc_pool_to_bus(p, self.id())) .collect(); Ok(pools) } /// Fetch all nexuses from the node via gRPC - pub(crate) async fn fetch_nexuses(&self) -> Result, SvcError> { - let mut ctx = self.grpc_client().await?; - let rpc_nexuses = ctx - .client - .list_nexus_v2(Null {}) - .await - .context(GrpcRequestError { - resource: ResourceKind::Nexus, - request: "list_nexus", - })?; + pub(crate) async fn fetch_nexuses( + &self, + client: &mut GrpcClient, + ) -> Result, SvcError> { + let rpc_nexuses = + client + .mayastor + .list_nexus_v2(Null {}) + .await + .context(GrpcRequestError { + resource: ResourceKind::Nexus, + request: "list_nexus", + })?; let rpc_nexuses = &rpc_nexuses.get_ref().nexus_list; let nexuses = rpc_nexuses .iter() - .map(|n| match rpc_nexus_v2_to_bus(n, &self.id) { + .filter_map(|n| match rpc_nexus_v2_to_bus(n, self.id()) { Ok(n) => Some(n), Err(error) => { tracing::error!(error=%error, "Could not convert rpc nexus"); None } }) - .flatten() .collect(); Ok(nexuses) } /// Update all the nexus states. - async fn update_nexus_states(&self) -> Result<(), SvcError> { - let nexuses = self.fetch_nexuses().await?; - self.states.write().update_nexuses(nexuses); + async fn update_nexus_states(&self, client: &mut GrpcClient) -> Result<(), SvcError> { + let nexuses = self.fetch_nexuses(client).await?; + self.resources_mut().update_nexuses(nexuses); Ok(()) } - async fn update_pool_states(&self) -> Result<(), SvcError> { - let pools = self.fetch_pools().await?; - self.states.write().update_pools(pools); + /// Update all the pool states. + async fn update_pool_states(&self, client: &mut GrpcClient) -> Result<(), SvcError> { + let pools = self.fetch_pools(client).await?; + self.resources_mut().update_pools(pools); Ok(()) } - async fn update_replica_states(&self) -> Result<(), SvcError> { - let replicas = self.fetch_replicas().await?; - self.states.write().update_replicas(replicas); + /// Update all the replica states. + async fn update_replica_states(&self, client: &mut GrpcClient) -> Result<(), SvcError> { + let replicas = self.fetch_replicas(client).await?; + self.resources_mut().update_replicas(replicas); Ok(()) } } -impl std::ops::Deref for NodeWrapper { - type Target = NodeState; - fn deref(&self) -> &Self::Target { - &self.node_state - } -} - -use crate::{ - core::{ - grpc::{GrpcClient, GrpcClientLocked}, - states::ResourceStatesLocked, - }, - node::service::NodeCommsTimeout, -}; -use async_trait::async_trait; -use common_lib::{ - mbus_api::{Message, MessageId, MessageIdTimeout}, - types::v0::{ - store, - store::{nexus::NexusState, replica::ReplicaState}, - }, -}; -use std::{ops::Deref, sync::Arc}; - /// CRUD Operations on a locked mayastor `NodeWrapper` such as: /// pools, replicas, nexuses and their children #[async_trait] -pub trait ClientOps { +pub(crate) trait ClientOps { + /// Get the grpc lock and client pair to execute the provided `request` + /// NOTE: Only available when the node status is online + async fn grpc_client_locked( + &self, + request: T, + ) -> Result; + /// Create a pool on the node via gRPC async fn create_pool(&self, request: &CreatePool) -> Result; /// Destroy a pool on the node via gRPC async fn destroy_pool(&self, request: &DestroyPool) -> Result<(), SvcError>; @@ -534,19 +570,19 @@ pub trait ClientOps { /// of the `ClientOps` trait and the `Registry` itself #[async_trait] pub(crate) trait InternalOps { - /// Get the grpc lock and client pair to execute the provided `request` - async fn grpc_client_locked( - &self, - request: T, - ) -> Result; /// Get the inner lock, typically used to sync mutating gRPC operations async fn grpc_lock(&self) -> Arc>; /// Update the node's nexus state information - async fn update_nexus_states(&self) -> Result<(), SvcError>; + async fn update_nexus_states(&self, mut ctx: &mut GrpcClient) -> Result<(), SvcError>; /// Update the node's pool state information - async fn update_pool_states(&self) -> Result<(), SvcError>; + async fn update_pool_states(&self, mut ctx: &mut GrpcClient) -> Result<(), SvcError>; /// Update the node's replica state information - async fn update_replica_states(&self) -> Result<(), SvcError>; + async fn update_replica_states(&self, mut ctx: &mut GrpcClient) -> Result<(), SvcError>; + /// Update all node state information + async fn update_all(&self, setting_online: bool) -> Result<(), SvcError>; + /// OnRegister callback when a node is re-registered with the registry via its heartbeat + /// On success returns where it's reset the node as online or not. + async fn on_register(&self) -> Result; } /// Getter operations on a mayastor locked `NodeWrapper` to get copies of its @@ -603,42 +639,73 @@ impl GetterOps for Arc> { #[async_trait] impl InternalOps for Arc> { - async fn grpc_client_locked( - &self, - request: T, - ) -> Result { - if !self.read().await.is_online() { - return Err(SvcError::NodeNotOnline { - node: self.read().await.id.clone(), - }); - } - let ctx = self.read().await.grpc_context_ext(request)?; - let client = ctx.connect_locked().await?; - Ok(client) - } async fn grpc_lock(&self) -> Arc> { self.write().await.lock.clone() } - async fn update_nexus_states(&self) -> Result<(), SvcError> { - self.read().await.update_nexus_states().await + async fn update_nexus_states(&self, mut ctx: &mut GrpcClient) -> Result<(), SvcError> { + self.read().await.update_nexus_states(ctx.deref_mut()).await + } + + async fn update_pool_states(&self, mut ctx: &mut GrpcClient) -> Result<(), SvcError> { + self.read().await.update_pool_states(ctx.deref_mut()).await + } + + async fn update_replica_states(&self, mut ctx: &mut GrpcClient) -> Result<(), SvcError> { + let node = self.read().await; + node.update_replica_states(ctx.deref_mut()).await } - async fn update_pool_states(&self) -> Result<(), SvcError> { - self.read().await.update_pool_states().await + async fn update_all(&self, setting_online: bool) -> Result<(), SvcError> { + let ctx = self.read().await.grpc_context_ext(GETS_TIMEOUT)?; + match ctx.connect_locked().await { + Ok(mut lock) => { + let results = self.read().await.fetch_resources(lock.deref_mut()).await; + + let mut node = self.write().await; + node.update(setting_online, results) + } + Err((_guard, error)) => { + self.write().await.set_status(NodeStatus::Unknown); + Err(error) + } + } } - async fn update_replica_states(&self) -> Result<(), SvcError> { - self.read().await.update_replica_states().await + async fn on_register(&self) -> Result { + let setting_online = { + let mut node = self.write().await; + node.pet().await; + !node.is_online() + }; + // if the node was not previously online then let's update all states right away + if setting_online { + self.update_all(setting_online).await.map(|_| true) + } else { + Ok(false) + } } } #[async_trait] impl ClientOps for Arc> { + async fn grpc_client_locked( + &self, + request: T, + ) -> Result { + if !self.read().await.is_online() { + return Err(SvcError::NodeNotOnline { + node: self.read().await.id().clone(), + }); + } + let ctx = self.read().await.grpc_context_ext(request)?; + ctx.connect_locked().await.map_err(|(_, error)| error) + } + async fn create_pool(&self, request: &CreatePool) -> Result { let mut ctx = self.grpc_client_locked(request.id()).await?; let rpc_pool = - ctx.client + ctx.mayastor .create_pool(request.to_rpc()) .await .context(GrpcRequestError { @@ -646,22 +713,24 @@ impl ClientOps for Arc> { request: "create_pool", })?; let pool = rpc_pool_to_bus(&rpc_pool.into_inner(), &request.node); - self.update_pool_states().await?; - self.update_replica_states().await?; + let mut ctx = ctx.reconnect(GETS_TIMEOUT).await?; + self.update_pool_states(ctx.deref_mut()).await?; + self.update_replica_states(ctx.deref_mut()).await?; Ok(pool) } /// Destroy a pool on the node via gRPC async fn destroy_pool(&self, request: &DestroyPool) -> Result<(), SvcError> { let mut ctx = self.grpc_client_locked(request.id()).await?; let _ = ctx - .client + .mayastor .destroy_pool(request.to_rpc()) .await .context(GrpcRequestError { resource: ResourceKind::Pool, request: "destroy_pool", })?; - self.update_pool_states().await?; + let mut ctx = ctx.reconnect(GETS_TIMEOUT).await?; + self.update_pool_states(ctx.deref_mut()).await?; Ok(()) } @@ -675,7 +744,7 @@ impl ClientOps for Arc> { } let mut ctx = self.grpc_client_locked(request.id()).await?; let rpc_replica = ctx - .client + .mayastor .create_replica_v2(request.to_rpc()) .await .context(GrpcRequestError { @@ -684,8 +753,9 @@ impl ClientOps for Arc> { })?; let replica = rpc_replica_to_bus(&rpc_replica.into_inner(), &request.node)?; - self.update_replica_states().await?; - self.update_pool_states().await?; + let mut ctx = ctx.reconnect(GETS_TIMEOUT).await?; + self.update_replica_states(ctx.deref_mut()).await?; + self.update_pool_states(ctx.deref_mut()).await?; Ok(replica) } @@ -693,7 +763,7 @@ impl ClientOps for Arc> { async fn share_replica(&self, request: &ShareReplica) -> Result { let mut ctx = self.grpc_client_locked(request.id()).await?; let share = ctx - .client + .mayastor .share_replica(request.to_rpc()) .await .context(GrpcRequestError { @@ -702,7 +772,8 @@ impl ClientOps for Arc> { })? .into_inner() .uri; - self.update_replica_states().await?; + let mut ctx = ctx.reconnect(GETS_TIMEOUT).await?; + self.update_replica_states(ctx.deref_mut()).await?; Ok(share) } @@ -710,7 +781,7 @@ impl ClientOps for Arc> { async fn unshare_replica(&self, request: &UnshareReplica) -> Result { let mut ctx = self.grpc_client_locked(request.id()).await?; let local_uri = ctx - .client + .mayastor .share_replica(request.to_rpc()) .await .context(GrpcRequestError { @@ -719,7 +790,8 @@ impl ClientOps for Arc> { })? .into_inner() .uri; - self.update_replica_states().await?; + let mut ctx = ctx.reconnect(GETS_TIMEOUT).await?; + self.update_replica_states(ctx.deref_mut()).await?; Ok(local_uri) } @@ -727,14 +799,15 @@ impl ClientOps for Arc> { async fn destroy_replica(&self, request: &DestroyReplica) -> Result<(), SvcError> { let mut ctx = self.grpc_client_locked(request.id()).await?; let _ = ctx - .client + .mayastor .destroy_replica(request.to_rpc()) .await .context(GrpcRequestError { resource: ResourceKind::Replica, request: "destroy_replica", })?; - self.update_replica_states().await?; + let mut ctx = ctx.reconnect(GETS_TIMEOUT).await?; + self.update_replica_states(ctx.deref_mut()).await?; // todo: remove when CAS-1107 is resolved if let Some(replica) = self.read().await.replica(&request.uuid) { if replica.pool == request.pool { @@ -743,7 +816,7 @@ impl ClientOps for Arc> { }); } } - self.update_pool_states().await?; + self.update_pool_states(ctx.deref_mut()).await?; Ok(()) } @@ -756,19 +829,20 @@ impl ClientOps for Arc> { }); } let mut ctx = self.grpc_client_locked(request.id()).await?; - let rpc_nexus = - ctx.client - .create_nexus_v2(request.to_rpc()) - .await - .context(GrpcRequestError { - resource: ResourceKind::Nexus, - request: "create_nexus", - })?; + let rpc_nexus = ctx + .mayastor + .create_nexus_v2(request.to_rpc()) + .await + .context(GrpcRequestError { + resource: ResourceKind::Nexus, + request: "create_nexus", + })?; let mut nexus = rpc_nexus_to_bus(&rpc_nexus.into_inner(), &request.node)?; // CAS-1107 - create_nexus_v2 returns NexusV1... nexus.name = request.name(); nexus.uuid = request.uuid.clone(); - self.update_nexus_states().await?; + let mut ctx = ctx.reconnect(GETS_TIMEOUT).await?; + self.update_nexus_states(ctx.deref_mut()).await?; Ok(nexus) } @@ -776,30 +850,32 @@ impl ClientOps for Arc> { async fn destroy_nexus(&self, request: &DestroyNexus) -> Result<(), SvcError> { let mut ctx = self.grpc_client_locked(request.id()).await?; let _ = ctx - .client + .mayastor .destroy_nexus(request.to_rpc()) .await .context(GrpcRequestError { resource: ResourceKind::Nexus, request: "destroy_nexus", })?; - self.update_nexus_states().await?; + let mut ctx = ctx.reconnect(GETS_TIMEOUT).await?; + self.update_nexus_states(ctx.deref_mut()).await?; Ok(()) } /// Share a nexus on the node via gRPC async fn share_nexus(&self, request: &ShareNexus) -> Result { let mut ctx = self.grpc_client_locked(request.id()).await?; - let share = ctx - .client - .publish_nexus(request.to_rpc()) - .await - .context(GrpcRequestError { - resource: ResourceKind::Nexus, - request: "publish_nexus", - })?; + let share = + ctx.mayastor + .publish_nexus(request.to_rpc()) + .await + .context(GrpcRequestError { + resource: ResourceKind::Nexus, + request: "publish_nexus", + })?; let share = share.into_inner().device_uri; - self.update_nexus_states().await?; + let mut ctx = ctx.reconnect(GETS_TIMEOUT).await?; + self.update_nexus_states(ctx.deref_mut()).await?; Ok(share) } @@ -807,22 +883,24 @@ impl ClientOps for Arc> { async fn unshare_nexus(&self, request: &UnshareNexus) -> Result<(), SvcError> { let mut ctx = self.grpc_client_locked(request.id()).await?; let _ = ctx - .client + .mayastor .unpublish_nexus(request.to_rpc()) .await .context(GrpcRequestError { resource: ResourceKind::Nexus, request: "unpublish_nexus", })?; - self.update_nexus_states().await?; + let mut ctx = ctx.reconnect(GETS_TIMEOUT).await?; + self.update_nexus_states(ctx.deref_mut()).await?; Ok(()) } /// Add a child to a nexus via gRPC async fn add_child(&self, request: &AddNexusChild) -> Result { let mut ctx = self.grpc_client_locked(request.id()).await?; - let result = ctx.client.add_child_nexus(request.to_rpc()).await; - self.update_nexus_states().await?; + let result = ctx.mayastor.add_child_nexus(request.to_rpc()).await; + let mut ctx = ctx.reconnect(GETS_TIMEOUT).await?; + self.update_nexus_states(ctx.deref_mut()).await?; let rpc_child = match result { Ok(child) => Ok(child), Err(error) => { @@ -852,8 +930,10 @@ impl ClientOps for Arc> { /// Remove a child from its parent nexus via gRPC async fn remove_child(&self, request: &RemoveNexusChild) -> Result<(), SvcError> { let mut ctx = self.grpc_client_locked(request.id()).await?; - let result = ctx.client.remove_child_nexus(request.to_rpc()).await; - self.update_nexus_states().await?; + let result = ctx.mayastor.remove_child_nexus(request.to_rpc()).await; + + let mut ctx = ctx.reconnect(GETS_TIMEOUT).await?; + self.update_nexus_states(ctx.deref_mut()).await?; match result { Ok(_) => Ok(()), Err(error) => { @@ -986,7 +1066,7 @@ impl PoolWrapper { impl From<&NodeWrapper> for NodeState { fn from(node: &NodeWrapper) -> Self { - node.node_state.clone() + node.node_state().clone() } } diff --git a/control-plane/agents/core/src/nexus/scheduling.rs b/control-plane/agents/core/src/nexus/scheduling.rs index 938fb218a..ff28e5d43 100644 --- a/control-plane/agents/core/src/nexus/scheduling.rs +++ b/control-plane/agents/core/src/nexus/scheduling.rs @@ -15,15 +15,14 @@ async fn get_healthy_children( registry: &Registry, ) -> Result { let builder = nexus::CreateVolumeNexus::builder_with_defaults(request, registry).await?; - - if let Some(info) = &builder.context().nexus_info() { - if !info.clean_shutdown { - let items = builder.collect(); - return Ok(HealthyChildItems::One(items)); + let info = builder.context().nexus_info().clone(); + if let Some(info_inner) = &builder.context().nexus_info() { + if !info_inner.clean_shutdown { + return Ok(HealthyChildItems::One(info, builder.collect())); } } let items = builder.collect(); - Ok(HealthyChildItems::All(items)) + Ok(HealthyChildItems::All(info, items)) } /// Get all usable healthy child replicas for nexus recreation diff --git a/control-plane/agents/core/src/nexus/specs.rs b/control-plane/agents/core/src/nexus/specs.rs index e19041178..ac17c2850 100644 --- a/control-plane/agents/core/src/nexus/specs.rs +++ b/control-plane/agents/core/src/nexus/specs.rs @@ -394,7 +394,7 @@ impl ResourceSpecsLocked { .await?; let result = node.remove_child(&request.into()).await; - self.on_remove_disown_replica(registry, request).await; + self.on_remove_disown_replica(request, &result); SpecOperations::complete_update(registry, result, nexus_spec, spec_clone).await } else { @@ -477,13 +477,17 @@ impl ResourceSpecsLocked { } } - async fn on_remove_disown_replica(&self, registry: &Registry, request: &RemoveNexusReplica) { - if let Some(replica) = self.get_replica(request.replica.uuid()) { - replica - .lock() - .disown(&ReplicaOwners::new(None, vec![request.nexus.clone()])); - let clone = replica.lock().clone(); - let _ = registry.store_obj(&clone).await; + fn on_remove_disown_replica( + &self, + request: &RemoveNexusReplica, + result: &Result<(), SvcError>, + ) { + if result.is_ok() { + if let Some(replica) = self.get_replica(request.replica.uuid()) { + replica + .lock() + .disown(&ReplicaOwners::new(None, vec![request.nexus.clone()])); + } } } diff --git a/control-plane/agents/core/src/node/mod.rs b/control-plane/agents/core/src/node/mod.rs index 3405b73e0..2733e712c 100644 --- a/control-plane/agents/core/src/node/mod.rs +++ b/control-plane/agents/core/src/node/mod.rs @@ -37,19 +37,8 @@ async fn create_node_service(builder: &Service) -> service::Service { let deadline = CliArgs::args().deadline.into(); let request = CliArgs::args().request_timeout.into(); let connect = CliArgs::args().connect_timeout.into(); - let service = service::Service::new(registry.clone(), deadline, request, connect); - - // attempt to reload the node state based on the specification - for node in registry.specs().get_nodes() { - service - .register_state(&Register { - id: node.id().clone(), - grpc_endpoint: node.endpoint().to_string(), - }) - .await; - } - service + service::Service::new(registry.clone(), deadline, request, connect).await } #[cfg(test)] @@ -149,4 +138,27 @@ mod tests { &Node::new(maya_name.clone(), node.spec().cloned(), None) ); } + + #[tokio::test] + async fn large_cluster() { + let expected_nodes = 7; + let cluster = ClusterBuilder::builder() + .with_rest(false) + .with_agents(vec!["core"]) + .with_mayastors(expected_nodes as u32) + .with_node_deadline("2s") + .build() + .await + .unwrap(); + + let nodes = GetNodes::default().request().await.unwrap(); + tracing::info!("Nodes: {:?}", nodes); + assert_eq!(nodes.0.len(), expected_nodes); + + cluster.restart_core().await; + + let nodes = GetNodes::default().request().await.unwrap(); + tracing::info!("Nodes: {:?}", nodes); + assert_eq!(nodes.0.len(), expected_nodes); + } } diff --git a/control-plane/agents/core/src/node/service.rs b/control-plane/agents/core/src/node/service.rs index 63c43aed6..f49d9b35d 100644 --- a/control-plane/agents/core/src/node/service.rs +++ b/control-plane/agents/core/src/node/service.rs @@ -11,6 +11,7 @@ use common_lib::types::v0::message_bus::{ Filter, GetSpecs, Node, NodeId, NodeState, NodeStatus, Specs, States, }; +use crate::core::wrapper::InternalOps; use rpc::mayastor::ListBlockDevicesRequest; use snafu::ResultExt; use std::{collections::HashMap, sync::Arc}; @@ -53,17 +54,30 @@ impl NodeCommsTimeout { impl Service { /// New Node Service which uses the `registry` as its node cache and sets /// the `deadline` to each node's watchdog - pub(super) fn new( + pub(super) async fn new( registry: Registry, deadline: std::time::Duration, request: std::time::Duration, connect: std::time::Duration, ) -> Self { - Self { + let service = Self { registry, deadline, comms_timeouts: NodeCommsTimeout::new(connect, request), + }; + // attempt to reload the node state based on the specification + for node in service.registry.specs().get_nodes() { + service + .register_state( + &Register { + id: node.id().clone(), + grpc_endpoint: node.endpoint().to_string(), + }, + true, + ) + .await; } + service } fn specs(&self) -> &ResourceSpecsLocked { self.registry.specs() @@ -84,36 +98,57 @@ impl Service { /// Register a new node through the register information pub(super) async fn register(&self, registration: &Register) { self.registry.register_node_spec(registration).await; - self.register_state(registration).await; + self.register_state(registration, false).await; } - /// Attempt to Register a new node state through the register information - pub(super) async fn register_state(&self, registration: &Register) { - let node = NodeState { + /// Attempt to Register a new node state through the register information. + /// todo: if we enable concurrent registrations when we move to gRPC, we'll want + /// to make sure we don't process registrations for the same node in parallel. + pub(super) async fn register_state(&self, registration: &Register, startup: bool) { + let node_state = NodeState { id: registration.id.clone(), grpc_endpoint: registration.grpc_endpoint.clone(), status: NodeStatus::Online, }; - let mut send_event = true; - let mut nodes = self.registry.nodes().write().await; - match nodes.get_mut(&node.id) { + let nodes = self.registry.nodes(); + let node = nodes.write().await.get_mut(&node_state.id).cloned(); + let send_event = match node { None => { - let mut node = NodeWrapper::new(&node, self.deadline, self.comms_timeouts.clone()); - if node.load().await.is_ok() { - node.watchdog_mut().arm(self.clone()); - nodes.insert(node.id.clone(), Arc::new(tokio::sync::RwLock::new(node))); + let mut node = + NodeWrapper::new(&node_state, self.deadline, self.comms_timeouts.clone()); + + let mut result = node.liveness_probe().await; + if result.is_ok() { + result = node.load().await; } - } - Some(node) => { - if node.read().await.status() == &NodeStatus::Online { - send_event = false; + match result { + Ok(_) => { + let mut nodes = self.registry.nodes().write().await; + if nodes.get_mut(&node_state.id).is_none() { + node.watchdog_mut().arm(self.clone()); + let node = Arc::new(tokio::sync::RwLock::new(node)); + nodes.insert(node_state.id().clone(), node); + true + } else { + false + } + } + Err(error) => { + tracing::warn!( + node = %node_state.id(), + error = %error, + "Failed to register node" + ); + false + } } - node.write().await.on_register().await; } - } + Some(node) => matches!(node.on_register().await, Ok(true)), + }; - if send_event { + // don't send these events on startup as the reconciler will start working afterwards anyway + if send_event && !startup { self.registry .notify(PollTriggerEvent::NodeStateChangeOnline) .await; @@ -193,7 +228,7 @@ impl Service { let mut client = grpc.connect().await?; let result = client - .client + .mayastor .list_block_devices(ListBlockDevicesRequest { all: request.all }) .await; diff --git a/control-plane/agents/core/src/pool/specs.rs b/control-plane/agents/core/src/pool/specs.rs index 7b7be8e99..4ceb62e69 100644 --- a/control-plane/agents/core/src/pool/specs.rs +++ b/control-plane/agents/core/src/pool/specs.rs @@ -197,6 +197,7 @@ impl ResourceSpecsLocked { let (_, _g) = SpecOperations::start_create(&pool_spec, registry, request, mode).await?; let result = node.create_pool(request).await; + let pool_state = SpecOperations::complete_create(result, &pool_spec, registry).await?; let pool_spec = pool_spec.lock().clone(); Ok(Pool::new(pool_spec, pool_state)) diff --git a/control-plane/agents/core/src/server.rs b/control-plane/agents/core/src/server.rs index 4c8dd64a9..8030881d9 100644 --- a/control-plane/agents/core/src/server.rs +++ b/control-plane/agents/core/src/server.rs @@ -115,6 +115,8 @@ fn init_tracing() { tracing_tags.dedup(); println!("Using the following tracing tags: {:?}", tracing_tags); + common_lib::opentelemetry::set_jaeger_env(); + global::set_text_map_propagator(TraceContextPropagator::new()); let tracer = opentelemetry_jaeger::new_pipeline() .with_agent_endpoint(jaeger) diff --git a/control-plane/agents/core/src/volume/scheduling.rs b/control-plane/agents/core/src/volume/scheduling.rs index f980130ff..41e95d400 100644 --- a/control-plane/agents/core/src/volume/scheduling.rs +++ b/control-plane/agents/core/src/volume/scheduling.rs @@ -61,13 +61,12 @@ pub(crate) async fn get_healthy_volume_replicas( registry: &Registry, ) -> Result { let builder = nexus::CreateVolumeNexus::builder_with_defaults(request, registry).await?; - - if let Some(info) = &builder.context().nexus_info() { - if !info.clean_shutdown { - let items = builder.collect(); - return Ok(HealthyChildItems::One(items)); + let info = builder.context().nexus_info().clone(); + if let Some(info_inner) = &builder.context().nexus_info() { + if !info_inner.clean_shutdown { + return Ok(HealthyChildItems::One(info, builder.collect())); } } let items = builder.collect(); - Ok(HealthyChildItems::All(items)) + Ok(HealthyChildItems::All(info, items)) } diff --git a/control-plane/agents/core/src/volume/specs.rs b/control-plane/agents/core/src/volume/specs.rs index ff7d216dd..d8cad2b83 100644 --- a/control-plane/agents/core/src/volume/specs.rs +++ b/control-plane/agents/core/src/volume/specs.rs @@ -139,7 +139,10 @@ pub(crate) async fn get_volume_replica_candidates( }); } - request.trace(&format!("Creation pool candidates for volume: {:?}", pools)); + request.trace(&format!( + "Creation pool candidates for volume: {:?}", + pools.iter().map(|p| p.state()).collect::>() + )); Ok(pools .iter() @@ -924,8 +927,8 @@ impl ResourceSpecsLocked { ) -> Result { let children = get_healthy_volume_replicas(vol_spec, target_node, registry).await?; let (count, items) = match children { - HealthyChildItems::One(candidates) => (1, candidates), - HealthyChildItems::All(candidates) => (candidates.len(), candidates), + HealthyChildItems::One(_, candidates) => (1, candidates), + HealthyChildItems::All(_, candidates) => (candidates.len(), candidates), }; let mut nexus_replicas = vec![]; @@ -1419,7 +1422,7 @@ async fn get_volume_target_node( let node = locked_node.read().await; // todo: use other metrics in order to make the "best" choice if node.is_online() { - return Ok(node.id.clone()); + return Ok(node.id().clone()); } } Err(SvcError::NoNodes {}) @@ -1430,10 +1433,10 @@ async fn get_volume_target_node( let node = registry.get_node_wrapper(node).await?; let node = node.read().await; if node.is_online() { - Ok(node.id.clone()) + Ok(node.id().clone()) } else { Err(SvcError::NodeNotOnline { - node: node.id.clone(), + node: node.id().clone(), }) } } diff --git a/control-plane/agents/core/src/volume/tests.rs b/control-plane/agents/core/src/volume/tests.rs index 0976af6a7..903d0fe6d 100644 --- a/control-plane/agents/core/src/volume/tests.rs +++ b/control-plane/agents/core/src/volume/tests.rs @@ -113,13 +113,14 @@ async fn volume_nexus_reconcile() { #[tokio::test] async fn garbage_collection() { + let reconcile_period = Duration::from_millis(500); let cluster = ClusterBuilder::builder() .with_rest(true) .with_agents(vec!["core"]) .with_mayastors(3) .with_tmpfs_pool(POOL_SIZE_BYTES) .with_cache_period("1s") - .with_reconcile_period(Duration::from_millis(500), Duration::from_millis(500)) + .with_reconcile_period(reconcile_period, reconcile_period) .build() .await .unwrap(); @@ -128,6 +129,89 @@ async fn garbage_collection() { unused_nexus_reconcile(&cluster).await; unused_reconcile(&cluster).await; + offline_replicas_reconcile(&cluster, reconcile_period).await; +} + +async fn offline_replicas_reconcile(cluster: &Cluster, reconcile_period: Duration) { + let rest_api = cluster.rest_v00(); + let volumes_api = rest_api.volumes_api(); + + let volume = volumes_api + .put_volume( + &"1e3cf927-80c2-47a8-adf0-95c481bdd7b7".parse().unwrap(), + models::CreateVolumeBody::new(models::VolumePolicy::default(), 2, 5242880u64), + ) + .await + .unwrap(); + + let nodes = rest_api.nodes_api().get_nodes().await.unwrap(); + let replica_nodes = rest_api.replicas_api().get_replicas().await.unwrap(); + let replica_nodes = replica_nodes + .into_iter() + .map(|r| r.node) + .collect::>(); + let free_node = nodes + .into_iter() + .find_map(|n| { + if replica_nodes.iter().all(|repl_node| repl_node != &n.id) { + Some(n.id) + } else { + None + } + }) + .unwrap(); + + // 1. publish on the node with no replicas + let volume = volumes_api + .put_volume_target( + &volume.spec.uuid, + &free_node, + models::VolumeShareProtocol::Nvmf, + ) + .await + .unwrap(); + + tracing::info!("Volume: {:?}", volume); + let volume_state = volume.state; + + let volume_id = volume_state.uuid; + let volume = volumes_api.get_volume(&volume_id).await.unwrap(); + assert_eq!(volume.state.status, models::VolumeStatus::Online); + + // 2. kill all replica nodes + for node in replica_nodes.iter().filter(|n| n != &&free_node) { + cluster.composer().stop(node).await.unwrap(); + } + + // 3. wait for volume to become Faulted + wait_till_volume_status(cluster, &volume.spec.uuid, models::VolumeStatus::Faulted).await; + + // 4. restart the core-agent + cluster.restart_core().await; + + Liveness::default() + .request_on(ChannelVs::Volume) + .await + .expect("Should have restarted by now"); + wait_till_volume_status(cluster, &volume.spec.uuid, models::VolumeStatus::Faulted).await; + + // 5. After the reconcilers run, replicas should not have been disowned + tokio::time::sleep(reconcile_period * 3).await; + + let volume = volumes_api.get_volume(&volume.spec.uuid).await.unwrap(); + assert_eq!(volume.state.status, models::VolumeStatus::Faulted); + + let replicas = rest_api.specs_api().get_specs().await.unwrap().replicas; + assert_eq!(replicas.len(), 2); + assert_eq!( + replicas + .iter() + .filter(|r| r.owners.volume.as_ref() == Some(&volume.spec.uuid)) + .count(), + 2 + ); + + volumes_api.del_volume(&volume_id).await.unwrap(); } async fn unused_nexus_reconcile(cluster: &Cluster) { diff --git a/control-plane/csi-controller/src/main.rs b/control-plane/csi-controller/src/main.rs index 65b18d56f..d87828c50 100644 --- a/control-plane/csi-controller/src/main.rs +++ b/control-plane/csi-controller/src/main.rs @@ -78,6 +78,7 @@ pub async fn main() -> Result<(), String> { git_version::git_version!(args = ["--abbrev=12", "--always"]), env!("CARGO_PKG_VERSION"), ); + common_lib::opentelemetry::set_jaeger_env(); let tracer = opentelemetry_jaeger::new_pipeline() .with_agent_endpoint(jaeger) .with_service_name("csi-controller") @@ -97,10 +98,12 @@ pub async fn main() -> Result<(), String> { CsiControllerConfig::get_config().rest_endpoint() ); - server::CsiServer::run( + let result = server::CsiServer::run( args.value_of("socket") .expect("CSI socket must be specfied") .to_string(), ) - .await + .await; + global::shutdown_tracer_provider(); + result } diff --git a/control-plane/msp-operator/src/main.rs b/control-plane/msp-operator/src/main.rs index 9df43941f..757849a6b 100644 --- a/control-plane/msp-operator/src/main.rs +++ b/control-plane/msp-operator/src/main.rs @@ -1029,6 +1029,7 @@ async fn main() -> anyhow::Result<()> { env!("CARGO_PKG_VERSION"), ); global::set_text_map_propagator(TraceContextPropagator::new()); + constants::set_jaeger_env(); let tracer = opentelemetry_jaeger::new_pipeline() .with_agent_endpoint(jaeger) .with_service_name("msp-operator") diff --git a/control-plane/rest/service/src/main.rs b/control-plane/rest/service/src/main.rs index e45931b11..859aa6dc8 100644 --- a/control-plane/rest/service/src/main.rs +++ b/control-plane/rest/service/src/main.rs @@ -109,6 +109,7 @@ fn init_tracing() -> Option { tracing::info!("Starting jaeger trace pipeline at {}...", agent); // Start a new jaeger trace pipeline global::set_text_map_propagator(TraceContextPropagator::new()); + common_lib::opentelemetry::set_jaeger_env(); let tracer = opentelemetry_jaeger::new_pipeline() .with_agent_endpoint(agent) .with_service_name("rest-server") @@ -220,12 +221,16 @@ async fn main() -> anyhow::Result<()> { ) .await; let server = HttpServer::new(app).bind_rustls(CliArgs::args().https, get_certificates()?)?; - if let Some(http) = CliArgs::args().http { + let result = if let Some(http) = CliArgs::args().http { server.bind(http).map_err(anyhow::Error::from)? } else { server } .run() .await - .map_err(|e| e.into()) + .map_err(|e| e.into()); + + global::shutdown_tracer_provider(); + + result } diff --git a/control-plane/rest/tests/v0_test.rs b/control-plane/rest/tests/v0_test.rs index 8fb29e518..42412b77e 100644 --- a/control-plane/rest/tests/v0_test.rs +++ b/control-plane/rest/tests/v0_test.rs @@ -59,7 +59,7 @@ fn bearer_token() -> String { #[tokio::test] async fn client() { // Run the client test both with and without authentication. - for auth in &[true, false] { + for auth in &[false, true] { let cluster = test_setup(auth).await; client_test(&cluster, auth).await; } @@ -221,7 +221,7 @@ async fn client_test(cluster: &Cluster, auth: &bool) { } ); - let mut child = client + let child = client .children_api() .put_node_nexus_child( &nexus.node, @@ -236,14 +236,16 @@ async fn client_test(cluster: &Cluster, auth: &bool) { .get_nexus_children(&nexus.uuid) .await .unwrap(); + let child_updated = children.iter().find(|c| c.uri == child.uri); // It's possible that the rebuild progress will change between putting a child and getting the - // list of children. Just check that they are both rebuilding and then set them to the same - // thing so that we can compare them in subsequent asserts. + // list of children so don't bother comparing the states assert!(child.rebuild_progress.is_some()); - assert!(children.last().unwrap().rebuild_progress.is_some()); - child.rebuild_progress = children.last().unwrap().rebuild_progress; - assert_eq!(Some(&child), children.last()); + assert!(child_updated.is_some()); + assert!( + child_updated.unwrap().rebuild_progress.is_some() + || child_updated.unwrap().state == models::ChildState::Online + ); client .nexuses_api() diff --git a/deploy/core-agents-deployment.yaml b/deploy/core-agents-deployment.yaml index 797a270b8..1a00f1aef 100644 --- a/deploy/core-agents-deployment.yaml +++ b/deploy/core-agents-deployment.yaml @@ -23,13 +23,15 @@ spec: - command: - sh - -c - - trap "exit 1" TERM; until nc -vz nats 4222; do echo "Waiting for nats..."; sleep 1; done; + - trap "exit 1" TERM; until nc -vz nats 4222; do echo "Waiting for nats..."; sleep + 1; done; image: busybox:latest name: nats-probe - command: - sh - -c - - trap "exit 1" TERM; until nc -vz mayastor-etcd 2379; do echo "Waiting for etcd..."; sleep 1; done; + - trap "exit 1" TERM; until nc -vz mayastor-etcd 2379; do echo "Waiting for etcd..."; + sleep 1; done; image: busybox:latest name: etcd-probe containers: @@ -41,7 +43,7 @@ spec: requests: cpu: 500m memory: 16Mi - image: mayadata/mcp-core:v1.0.0 + image: mayadata/mcp-core:v1.0.1 imagePullPolicy: IfNotPresent args: - "-smayastor-etcd" diff --git a/deploy/csi-deployment.yaml b/deploy/csi-deployment.yaml index 7bcc1e974..7b5b74d02 100644 --- a/deploy/csi-deployment.yaml +++ b/deploy/csi-deployment.yaml @@ -26,7 +26,8 @@ spec: - command: - sh - -c - - trap "exit 1" TERM; until nc -vz rest 8081; do echo "Waiting for REST API endpoint to become available"; sleep 1; done; + - trap "exit 1" TERM; until nc -vz rest 8081; do echo "Waiting for REST API endpoint + to become available"; sleep 1; done; image: busybox:latest name: rest-probe containers: @@ -65,11 +66,11 @@ spec: requests: cpu: 16m memory: 64Mi - image: mayadata/mcp-csi-controller:v1.0.0 + image: mayadata/mcp-csi-controller:v1.0.1 imagePullPolicy: IfNotPresent args: - "--csi-socket=/var/lib/csi/sockets/pluginproxy/csi.sock" - - "--rest-endpoint=http://$(REST_SERVICE_HOST):8081" + - "--rest-endpoint=http://rest:8081" env: - name: RUST_LOG value: info diff --git a/deploy/jaeger-operator/deployment.yaml b/deploy/jaeger-operator/deployment.yaml index 690dbdf1a..36549d512 100644 --- a/deploy/jaeger-operator/deployment.yaml +++ b/deploy/jaeger-operator/deployment.yaml @@ -45,14 +45,3 @@ spec: value: "mayastor-jaeger-operator" resources: {} - affinity: - nodeAffinity: - requiredDuringSchedulingIgnoredDuringExecution: - nodeSelectorTerms: - - matchExpressions: - - key: node-role.kubernetes.io/master - operator: In - values: - - "" - tolerations: - - key: node-role.kubernetes.io/master diff --git a/deploy/jaeger-operator/jaeger.yaml b/deploy/jaeger-operator/jaeger.yaml index edf7a5ec5..722e2e5ae 100644 --- a/deploy/jaeger-operator/jaeger.yaml +++ b/deploy/jaeger-operator/jaeger.yaml @@ -9,17 +9,6 @@ spec: strategy: allInOne ingress: enabled: false - affinity: - nodeAffinity: - requiredDuringSchedulingIgnoredDuringExecution: - nodeSelectorTerms: - - matchExpressions: - - key: node-role.kubernetes.io/master - operator: In - values: - - "" - tolerations: - - key: node-role.kubernetes.io/master query: serviceType: NodePort nodePort: 30012 diff --git a/deploy/msp-deployment.yaml b/deploy/msp-deployment.yaml index 7dccfaad4..a55496b62 100644 --- a/deploy/msp-deployment.yaml +++ b/deploy/msp-deployment.yaml @@ -24,13 +24,15 @@ spec: - command: - sh - -c - - trap "exit 1" TERM; until nc -vz nats 4222; do echo "Waiting for nats..."; sleep 1; done; + - trap "exit 1" TERM; until nc -vz nats 4222; do echo "Waiting for nats..."; sleep + 1; done; image: busybox:latest name: nats-probe - command: - sh - -c - - trap "exit 1" TERM; until nc -vz mayastor-etcd 2379; do echo "Waiting for etcd..."; sleep 1; done; + - trap "exit 1" TERM; until nc -vz mayastor-etcd 2379; do echo "Waiting for etcd..."; + sleep 1; done; image: busybox:latest name: etcd-probe containers: @@ -42,10 +44,10 @@ spec: requests: cpu: 50m memory: 16Mi - image: mayadata/mcp-msp-operator:v1.0.0 + image: mayadata/mcp-msp-operator:v1.0.1 imagePullPolicy: IfNotPresent args: - - "-e http://$(REST_SERVICE_HOST):8081" + - "-e http://rest:8081" - "--interval=30s" env: - name: RUST_LOG diff --git a/deploy/rest-deployment.yaml b/deploy/rest-deployment.yaml index 197810c9b..27c41307a 100644 --- a/deploy/rest-deployment.yaml +++ b/deploy/rest-deployment.yaml @@ -23,13 +23,15 @@ spec: - command: - sh - -c - - trap "exit 1" TERM; until nc -vz nats 4222; do echo "Waiting for nats..."; sleep 1; done; + - trap "exit 1" TERM; until nc -vz nats 4222; do echo "Waiting for nats..."; sleep + 1; done; image: busybox:latest name: nats-probe - command: - sh - -c - - trap "exit 1" TERM; until nc -vz mayastor-etcd 2379; do echo "Waiting for etcd..."; sleep 1; done; + - trap "exit 1" TERM; until nc -vz mayastor-etcd 2379; do echo "Waiting for etcd..."; + sleep 1; done; image: busybox:latest name: etcd-probe containers: @@ -41,7 +43,7 @@ spec: requests: cpu: 50m memory: 32Mi - image: mayadata/mcp-rest:v1.0.0 + image: mayadata/mcp-rest:v1.0.1 imagePullPolicy: IfNotPresent args: - "--dummy-certificates" diff --git a/deploy/terraform/mod/csi-controller/main.tf b/deploy/terraform/mod/csi-controller/main.tf index f4ee21c4c..c1164acab 100644 --- a/deploy/terraform/mod/csi-controller/main.tf +++ b/deploy/terraform/mod/csi-controller/main.tf @@ -87,7 +87,7 @@ resource "kubernetes_deployment" "deployment_csi_controller" { args = concat([ "--csi-socket=/var/lib/csi/sockets/pluginproxy/csi.sock", - "--rest-endpoint=http://$(REST_SERVICE_HOST):8081", + "--rest-endpoint=http://rest:8081", ], var.jaeger_agent_argument) env { name = "RUST_LOG" diff --git a/deploy/terraform/mod/k8s-operator/main.tf b/deploy/terraform/mod/k8s-operator/main.tf index 8201d9a70..7fd723db3 100644 --- a/deploy/terraform/mod/k8s-operator/main.tf +++ b/deploy/terraform/mod/k8s-operator/main.tf @@ -33,7 +33,7 @@ resource "kubernetes_deployment" "deployment_msp_operator" { service_account_name = "mayastor-service-account" container { args = concat([ - "-e http://$(REST_SERVICE_HOST):8081", + "-e http://rest:8081", "--interval=${var.cache_period}" ], var.jaeger_agent_argument diff --git a/deployer/src/infra/csi.rs b/deployer/src/infra/csi.rs index 63afd8356..57edad8a6 100644 --- a/deployer/src/infra/csi.rs +++ b/deployer/src/infra/csi.rs @@ -18,16 +18,25 @@ impl ComponentAction for Csi { } else { if options.build { std::process::Command::new("cargo") - .args(&["build", "-p", "rest", "--bin", "rest"]) + .args(&["build", "-p", "csi-controller", "--bin", "csi-controller"]) .status()?; } - let binary = Binary::from_dbg("csi-controller") + let mut binary = Binary::from_dbg("csi-controller") .with_args(vec!["--rest-endpoint", "http://rest:8081"]) // Make sure that CSI socket is always under shared directory // regardless of what its default value is. .with_args(vec!["--csi-socket", CSI_SOCKET]); + if cfg.container_exists("jaeger") { + let jaeger_config = format!("jaeger.{}:6831", cfg.get_name()); + binary = binary.with_args(vec!["--jaeger", &jaeger_config]) + }; + + if let Some(size) = &options.otel_max_batch_size { + binary = binary.with_env("OTEL_BSP_MAX_EXPORT_BATCH_SIZE", size); + } + cfg.add_container_spec( ContainerSpec::from_binary("csi-controller", binary) .with_bypass_default_mounts(true) diff --git a/deployer/src/infra/mod.rs b/deployer/src/infra/mod.rs index 4cb3e772a..65b6c81c1 100644 --- a/deployer/src/infra/mod.rs +++ b/deployer/src/infra/mod.rs @@ -107,6 +107,7 @@ macro_rules! impl_ctrlp_agents { binary = binary.with_env(kv.key.as_str(), kv.value.as_str().as_ref()); } } + if name == "core" { let etcd = format!("etcd.{}:2379", options.cluster_label.name()); binary = binary.with_args(vec!["--store", &etcd]); @@ -142,6 +143,9 @@ macro_rules! impl_ctrlp_agents { binary = binary.with_args(vec!["--jaeger", &jaeger_config]); } } + if let Some(size) = &options.otel_max_batch_size { + binary = binary.with_env("OTEL_BSP_MAX_EXPORT_BATCH_SIZE", size); + } Ok(cfg.add_container_bin(&name, binary)) } async fn start(&self, _options: &StartOptions, cfg: &ComposeTest) -> Result<(), Error> { diff --git a/deployer/src/infra/rest.rs b/deployer/src/infra/rest.rs index 1d68fce26..e3dc2cc4c 100644 --- a/deployer/src/infra/rest.rs +++ b/deployer/src/infra/rest.rs @@ -39,6 +39,10 @@ impl ComponentAction for Rest { binary = binary.with_args(vec!["--jaeger", &jaeger_config]) }; + if let Some(size) = &options.otel_max_batch_size { + binary = binary.with_env("OTEL_BSP_MAX_EXPORT_BATCH_SIZE", size); + } + cfg.add_container_spec( ContainerSpec::from_binary("rest", binary) .with_portmap("8080", "8080") diff --git a/deployer/src/lib.rs b/deployer/src/lib.rs index feb6d6942..9c33074ca 100644 --- a/deployer/src/lib.rs +++ b/deployer/src/lib.rs @@ -216,6 +216,10 @@ pub struct StartOptions { #[structopt(long)] pub reconcile_idle_period: Option, + /// Override the core agent's reconcile idle period + #[structopt(long, env = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE")] + pub otel_max_batch_size: Option, + /// Amount of time to wait for all containers to start. #[structopt(short, long)] pub wait_timeout: Option, diff --git a/tests/bdd/common/apiclient.py b/tests/bdd/common/apiclient.py index 07a119547..dbbfda868 100644 --- a/tests/bdd/common/apiclient.py +++ b/tests/bdd/common/apiclient.py @@ -3,6 +3,7 @@ from openapi.api.specs_api import SpecsApi from openapi.api.replicas_api import ReplicasApi from openapi.api.nodes_api import NodesApi +from openapi.api.nexuses_api import NexusesApi from openapi import api_client from openapi import configuration @@ -48,3 +49,8 @@ def nodes_api(): @staticmethod def replicas_api(): return ReplicasApi(get_api_client()) + + # Return a NexusesApi object which can be used for performing nexus related REST calls. + @staticmethod + def nexuses_api(): + return NexusesApi(get_api_client()) diff --git a/tests/bdd/features/volume/nexus.feature b/tests/bdd/features/volume/nexus.feature new file mode 100644 index 000000000..6ebcc62d1 --- /dev/null +++ b/tests/bdd/features/volume/nexus.feature @@ -0,0 +1,12 @@ +Feature: Managing a Volume Nexus Target + + Background: + Given a control plane, two Mayastor instances, two pools + + Scenario: the target nexus is faulted + Given a published self-healing volume + When its nexus target is faulted + And one or more of its healthy replicas are back online + Then the nexus target shall be removed from its associated node + And it shall be recreated + diff --git a/tests/bdd/setup.sh b/tests/bdd/setup.sh index b93c1d496..2b9f95b28 100755 --- a/tests/bdd/setup.sh +++ b/tests/bdd/setup.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -DIR_NAME="$(dirname "$(pwd)/${BASH_SOURCE[0]}")" +DIR_NAME="$(dirname "$0")" virtualenv --no-setuptools "$DIR_NAME"/venv diff --git a/tests/bdd/test_replicas_garbage_collection.py b/tests/bdd/test_replicas_garbage_collection.py index 3ba43aca8..78a448765 100644 --- a/tests/bdd/test_replicas_garbage_collection.py +++ b/tests/bdd/test_replicas_garbage_collection.py @@ -28,9 +28,9 @@ MAYASTOR_1 = "mayastor-1" MAYASTOR_2 = "mayastor-2" -POOL_DISK1 = "disk1.img" +POOL_DISK1 = "cdisk1.img" POOL1_UUID = "4cc6ee64-7232-497d-a26f-38284a444980" -POOL_DISK2 = "disk2.img" +POOL_DISK2 = "cdisk2.img" POOL2_UUID = "4cc6ee64-7232-497d-a26f-38284a444990" diff --git a/tests/bdd/test_volume_nexus.py b/tests/bdd/test_volume_nexus.py new file mode 100644 index 000000000..e9340cda7 --- /dev/null +++ b/tests/bdd/test_volume_nexus.py @@ -0,0 +1,167 @@ +"""Managing a Volume Nexus Target feature tests.""" +import os + +from pytest_bdd import given, scenario, then, when +import pytest +from retrying import retry + +from common.deployer import Deployer +from common.apiclient import ApiClient +from common.docker import Docker + +from openapi.model.create_pool_body import CreatePoolBody +from openapi.model.create_volume_body import CreateVolumeBody +from openapi.model.protocol import Protocol +from openapi.model.volume_policy import VolumePolicy +from openapi.model.nexus_state import NexusState +from openapi.model.topology import Topology +from openapi.model.pool_topology import PoolTopology +from openapi.model.labelled_topology import LabelledTopology + + +@scenario("features/volume/nexus.feature", "the target nexus is faulted") +def test_the_target_nexus_is_faulted(): + """the target nexus is faulted.""" + + +@given("a control plane, two Mayastor instances, two pools") +def a_control_plane_two_mayastor_instances_two_pools(init): + """a control plane, two Mayastor instances, two pools.""" + + +@given("a published self-healing volume") +def a_published_selfhealing_volume(): + """a published self-healing volume.""" + request = CreateVolumeBody( + VolumePolicy(True), + NUM_VOLUME_REPLICAS, + VOLUME_SIZE, + topology=Topology( + pool_topology=PoolTopology( + labelled=LabelledTopology(exclusion={}, inclusion={"node": MAYASTOR_2}) + ) + ), + ) + ApiClient.volumes_api().put_volume(VOLUME_UUID, request) + ApiClient.volumes_api().put_volume_target(VOLUME_UUID, MAYASTOR_1, Protocol("nvmf")) + + +@when("its nexus target is faulted") +def its_nexus_target_is_faulted(): + """its nexus target is faulted.""" + # Kill remote mayastor instance, which should fault the volume nexus child + Docker.kill_container(MAYASTOR_2) + check_target_faulted() + + +@when("one or more of its healthy replicas are back online") +def one_or_more_of_its_healthy_replicas_are_back_online(): + """one or more of its healthy replicas are back online.""" + # Brings back the mayastor instance, which should expose the replicas upon pool import + Docker.restart_container(MAYASTOR_2) + check_replicas_online() + + +@then("it shall be recreated") +def it_shall_be_recreated(): + """it shall be recreated.""" + check_nexus_online() + + +@then("the nexus target shall be removed from its associated node") +def the_nexus_target_shall_be_removed_from_its_associated_node(): + """the nexus shall be removed from its associated node.""" + check_nexus_removed() + + +""" ... """ + +VOLUME_UUID = "5cd5378e-3f05-47f1-a830-a0f5873a1449" +VOLUME_SIZE = 10485761 +NUM_VOLUME_REPLICAS = 1 + +MAYASTOR_1 = "mayastor-1" +MAYASTOR_2 = "mayastor-2" + +POOL_DISK1 = "cdisk1.img" +POOL1_UUID = "4cc6ee64-7232-497d-a26f-38284a444980" +POOL_DISK2 = "cdisk2.img" +POOL2_UUID = "4cc6ee64-7232-497d-a26f-38284a444990" + + +@pytest.fixture(scope="function") +def create_pool_disk_images(): + # When starting Mayastor instances with the deployer a bind mount is created from /tmp to + # /host/tmp, so create disk images in /tmp + for disk in [POOL_DISK1, POOL_DISK2]: + path = f"/tmp/{disk}" + with open(path, "w") as file: + file.truncate(20 * 1024 * 1024) + + yield + for disk in [POOL_DISK1, POOL_DISK2]: + path = f"/tmp/{disk}" + if os.path.exists(path): + os.remove(path) + + +@pytest.fixture(scope="function") +def init(create_pool_disk_images): + # Shorten the reconcile periods and cache period to speed up the tests. + Deployer.start_with_args( + [ + "-j", + "-m=2", + "-w=10s", + "--reconcile-idle-period=500ms", + "--reconcile-period=500ms", + "--cache-period=1s", + ] + ) + + # Create pools + ApiClient.pools_api().put_node_pool( + MAYASTOR_2, + POOL2_UUID, + CreatePoolBody([f"aio:///host/tmp/{POOL_DISK2}"], labels={"node": MAYASTOR_2}), + ) + + yield + Deployer.stop() + + +@retry(wait_fixed=200, stop_max_delay=5000) +def check_target_faulted(): + volume = ApiClient.volumes_api().get_volume(VOLUME_UUID) + assert NexusState(volume.state.target["state"]) == NexusState("Faulted") + + +@retry(wait_fixed=200, stop_max_attempt_number=30) +def check_nexus_online(): + volume = ApiClient.volumes_api().get_volume(VOLUME_UUID) + nexus_uuid = volume.state.target["uuid"] + nexus = ApiClient.nexuses_api().get_nexus(nexus_uuid) + assert nexus.state == NexusState("Online") + + +@retry(wait_fixed=200, stop_max_delay=5000) +def check_nexus_removed(): + volume = ApiClient.volumes_api().get_volume(VOLUME_UUID) + assert ( + not hasattr(volume.state, "target") + # or it might have been recreated if we lost the "race"... + or NexusState(volume.state.target["state"]) == NexusState("Online") + or NexusState(volume.state.target["state"]) == NexusState("Degraded") + ) + + +@retry(wait_fixed=200, stop_max_delay=5000) +def check_replicas_online(): + volume = ApiClient.volumes_api().get_volume(VOLUME_UUID) + online_replicas = list( + filter( + lambda uuid: str(volume.state.replica_topology[uuid].state) == "Online", + list(volume.state.replica_topology), + ) + ) + assert len(online_replicas) > 0