Skip to content

Commit

Permalink
chore(bors): merge pull request #206
Browse files Browse the repository at this point in the history
206: fix: sending node online event should not block r=tiagolobocastro a=tiagolobocastro

On startup if the cluster had more than 5 Nodes the send event notification per node would block.
This is because the queue has only 5 elements.
Simply increasing the queue size would not necessarily improve things so instead we drop the
notification if the queue is full which is acceptable as at the moment we have no way of
"flattening" the events or to avoid running the reconcilers too often so we probably don't want
a huge queue of notifications to be built. Losing one is not a problem, just may mean we'd be a
little slower.

Co-authored-by: Tiago Castro <[email protected]>
  • Loading branch information
mayastor-bors and tiagolobocastro committed Jun 7, 2022
2 parents 62f82ea + fb0af69 commit 2230eb9
Show file tree
Hide file tree
Showing 53 changed files with 1,236 additions and 504 deletions.
1 change: 1 addition & 0 deletions .github/workflows/release_artifacts.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
branches:
- master
- 'release/**'
- 'hotfix-v**'

jobs:
kubectl-plugin:
Expand Down
7 changes: 0 additions & 7 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,3 @@ repos:
args: [ "--changes" ]
pass_filenames: false
language: system
- id: helm-develop-deploy
name: Helm Generator
description: Ensures the deploy is updated with the develop yamls
entry: ./scripts/generate-deploy-yamls.sh
args: [ "-c", "develop" ]
pass_filenames: false
language: system
2 changes: 1 addition & 1 deletion chart/templates/csi-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ spec:
imagePullPolicy: {{ .Values.mayastorCP.pullPolicy }}
args:
- "--csi-socket=/var/lib/csi/sockets/pluginproxy/csi.sock"
- "--rest-endpoint=http://$(REST_SERVICE_HOST):8081"{{ if .Values.base.jaeger.enabled }}
- "--rest-endpoint=http://rest:8081"{{ if .Values.base.jaeger.enabled }}
- "--jaeger={{ .Values.base.jaeger.agent.name }}:{{ .Values.base.jaeger.agent.port }}"{{ end }}
env:
- name: RUST_LOG
Expand Down
2 changes: 1 addition & 1 deletion chart/templates/msp-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ spec:
image: {{ .Values.mayastorCP.registry }}mayadata/mcp-msp-operator:{{ .Values.mayastorCP.tag }}
imagePullPolicy: {{ .Values.mayastorCP.pullPolicy }}
args:
- "-e http://$(REST_SERVICE_HOST):8081"
- "-e http://rest:8081"
- "--interval={{ .Values.base.cache_poll_period }}"{{ if .Values.base.jaeger.enabled }}
- "--jaeger={{ .Values.base.jaeger.agent.name }}:{{ .Values.base.jaeger.agent.port }}"{{ end }}
env:
Expand Down
15 changes: 15 additions & 0 deletions common/src/opentelemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,18 @@ pub fn default_tracing_tags(git_commit: &str, cargo_version: &str) -> Vec<KeyVal
KeyValue::new("crate.version", cargo_version.to_string()),
]
}

/// Name of the OTEL_BSP_MAX_EXPORT_BATCH_SIZE variable
pub const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_NAME: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE";
/// The value of OTEL_BSP_MAX_EXPORT_BATCH_SIZE to be used with JAEGER
pub const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_JAEGER: &str = "64";
/// Set the OTEL variables for a jaeger configuration
pub fn set_jaeger_env() {
// if not set, default it to our jaeger value
if std::env::var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE_NAME).is_err() {
std::env::set_var(
OTEL_BSP_MAX_EXPORT_BATCH_SIZE_NAME,
OTEL_BSP_MAX_EXPORT_BATCH_SIZE_JAEGER,
);
}
}
2 changes: 1 addition & 1 deletion common/src/store/etcd_keep_alive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ impl LeaseLockKeeperClocking<Locked> for EtcdSingletonLock {

#[async_trait::async_trait]
impl LeaseLockKeeperClocking<KeepAlive> for EtcdSingletonLock {
#[tracing::instrument(skip(self, state), err)]
#[tracing::instrument(level = "trace", skip(self, state), err)]
async fn clock(&mut self, mut state: KeepAlive) -> LockStatesResult {
state
.keeper
Expand Down
1 change: 1 addition & 0 deletions common/src/types/v0/message_bus/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ pub struct CreateReplica {
#[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)]
pub struct ReplicaOwners {
volume: Option<VolumeId>,
#[serde(skip)]
nexuses: Vec<NexusId>,
}
impl ReplicaOwners {
Expand Down
4 changes: 4 additions & 0 deletions common/src/types/v0/store/nexus_persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ impl NexusInfo {
None => false,
}
}
/// Check if no replica is healthy
pub fn no_healthy_replicas(&self) -> bool {
self.children.iter().all(|c| !c.healthy) || self.children.is_empty()
}
}

/// Definition of the child information that gets saved in the persistent
Expand Down
8 changes: 8 additions & 0 deletions control-plane/agents/common/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ pub enum SvcError {
ReplicaCreateNumber { id: String },
#[snafu(display("No online replicas are available for Volume '{}'", id))]
NoOnlineReplicas { id: String },
#[snafu(display("No healthy replicas are available for Volume '{}'", id))]
NoHealthyReplicas { id: String },
#[snafu(display("Entry with key '{}' not found in the persistent store.", key))]
StoreMissingEntry { key: String },
#[snafu(display("The uuid '{}' for kind '{}' is not valid.", uuid, kind.to_string()))]
Expand Down Expand Up @@ -514,6 +516,12 @@ impl From<SvcError> for ReplyError {
source: desc.to_string(),
extra: error.full_string(),
},
SvcError::NoHealthyReplicas { .. } => ReplyError {
kind: ReplyErrorKind::VolumeNoReplicas,
resource: ResourceKind::Volume,
source: desc.to_string(),
extra: error.full_string(),
},
SvcError::ReplicaCreateNumber { .. } => ReplyError {
kind: ReplyErrorKind::ReplicaCreateNumber,
resource: ResourceKind::Volume,
Expand Down
57 changes: 48 additions & 9 deletions control-plane/agents/core/src/core/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,27 @@ impl GrpcContext {
comms_timeouts: comms_timeouts.clone(),
})
}
pub(crate) async fn lock(&self) -> tokio::sync::OwnedMutexGuard<()> {
/// Override the timeout config in the context for the given request
fn override_timeout<R: MessageIdTimeout>(&mut self, request: Option<R>) {
let timeout = request
.map(|r| r.timeout(self.comms_timeouts.request(), &bus()))
.unwrap_or_else(|| self.comms_timeouts.request());

self.endpoint = self
.endpoint
.clone()
.connect_timeout(self.comms_timeouts.connect() + Duration::from_millis(500))
.timeout(timeout);
}
pub(crate) async fn lock(&self) -> GrpcLockGuard {
self.lock.clone().lock_owned().await
}
pub(crate) async fn connect(&self) -> Result<GrpcClient, SvcError> {
GrpcClient::new(self).await
}
pub(crate) async fn connect_locked(&self) -> Result<GrpcClientLocked, SvcError> {
pub(crate) async fn connect_locked(
&self,
) -> Result<GrpcClientLocked, (GrpcLockGuard, SvcError)> {
GrpcClientLocked::new(self).await
}
}
Expand All @@ -72,7 +86,7 @@ impl GrpcContext {
pub(crate) struct GrpcClient {
context: GrpcContext,
/// gRPC Mayastor Client
pub(crate) client: MayaClient,
pub(crate) mayastor: MayaClient,
}
pub(crate) type MayaClient = MayastorClient<Channel>;
impl GrpcClient {
Expand All @@ -96,23 +110,48 @@ impl GrpcClient {

Ok(Self {
context: context.clone(),
client,
mayastor: client,
})
}
}

/// Wrapper over all gRPC Clients types with implicit locking for serialization
/// Async Lock guard for gRPC operations.
/// It's used by the GrpcClientLocked to ensure there's only one operation in progress
/// at a time while still allowing for multiple gRPC clients.
type GrpcLockGuard = tokio::sync::OwnedMutexGuard<()>;

/// Wrapper over all gRPC Clients types with implicit locking for serialization.
pub(crate) struct GrpcClientLocked {
/// gRPC auto CRUD guard lock
_lock: tokio::sync::OwnedMutexGuard<()>,
_lock: GrpcLockGuard,
client: GrpcClient,
}
impl GrpcClientLocked {
pub(crate) async fn new(context: &GrpcContext) -> Result<Self, SvcError> {
let client = GrpcClient::new(context).await?;
/// Create new locked client from the given context
/// A connection is established with the timeouts specified from the context.
/// Only one `Self` is allowed at a time by making use of a lock guard.
pub(crate) async fn new(context: &GrpcContext) -> Result<Self, (GrpcLockGuard, SvcError)> {
let _lock = context.lock().await;

let client = match GrpcClient::new(context).await {
Ok(client) => client,
Err(error) => return Err((_lock, error)),
};

Ok(Self { _lock, client })
}
/// Reconnect the client to use for the given request
/// This is useful when we want to issue the next gRPC using a different timeout
/// todo: tower should allow us to handle this better by keeping the same "backend" client
/// but modifying the timeout layer?
pub(crate) async fn reconnect<R: MessageIdTimeout>(self, request: R) -> Result<Self, SvcError> {
let mut context = self.context.clone();
context.override_timeout(Some(request));

let client = GrpcClient::new(&context).await?;

Ok(Self {
_lock: context.lock().await,
_lock: self._lock,
client,
})
}
Expand Down
5 changes: 4 additions & 1 deletion control-plane/agents/core/src/core/reconciler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ impl ReconcilerControl {
}

/// Send an event signal to the poller's main loop
/// todo: don't requeque duplicate events
pub(crate) async fn notify(&self, event: PollEvent) {
self.event_channel.send(event).await.ok();
if let Err(error) = self.event_channel.try_send(event) {
tracing::warn!(error=?error, "Failed to send event to reconcile worker");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use common_lib::types::v0::{
use crate::core::task_poller::PollTriggerEvent;
use parking_lot::Mutex;
use std::sync::Arc;
use tracing::Instrument;

/// Nexus Garbage Collector reconciler
#[derive(Debug)]
Expand Down Expand Up @@ -110,13 +111,25 @@ async fn destroy_disowned_nexus(
if nexus_clone.managed && !nexus_clone.owned() {
let node_online = matches!(context.registry().get_node_wrapper(&nexus_clone.node).await, Ok(node) if node.read().await.is_online());
if node_online {
nexus_clone.warn_span(|| tracing::warn!("Attempting to destroy disowned nexus"));
let request = DestroyNexus::from(nexus_clone.clone());
context
.specs()
.destroy_nexus(context.registry(), &request, true, mode)
.await?;
nexus_clone.info_span(|| tracing::info!("Successfully destroyed disowned nexus"));
async {
nexus_clone.warn_span(|| tracing::warn!("Attempting to destroy disowned nexus"));
let request = DestroyNexus::from(nexus_clone.clone());
match context
.specs()
.destroy_nexus(context.registry(), &request, true, mode)
.await {
Ok(_) => {
nexus_clone.info_span(|| tracing::info!("Successfully destroyed disowned nexus"));
Ok(())
}
Err(error) => {
nexus_clone.error_span(|| tracing::error!(error = %error, "Failed to destroy disowned nexus"));
Err(error)
}
}
}
.instrument(tracing::info_span!("destroy_disowned_nexus", nexus.uuid = %nexus_clone.uuid, request.reconcile = true))
.await?;
}
}

Expand Down
Loading

0 comments on commit 2230eb9

Please sign in to comment.