Skip to content

Commit

Permalink
fix(perf): several perf tunning changes to decrease GET delays
Browse files Browse the repository at this point in the history
The major one is to not GET all volumes when we only want to get one volume.
The other ones are to avoid cloning as much as possible and return references.
  • Loading branch information
tiagolobocastro committed Jun 7, 2022
1 parent 571019e commit 389a782
Show file tree
Hide file tree
Showing 14 changed files with 296 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,18 @@ async fn destroy_orphaned_nexus(
Ok(guard) => guard,
Err(_) => return PollResult::Ok(PollerState::Busy),
};
let nexus_clone = nexus_spec.lock().clone();

if !nexus_clone.managed {
return PollResult::Ok(PollerState::Idle);
}
if let Some(owner) = &nexus_clone.owner {
if context.specs().get_volume(owner).is_err() {
let owner = {
let nexus = nexus_spec.lock();
if !nexus.managed {
return PollResult::Ok(PollerState::Idle);
}
nexus.owner.clone()
};

if let Some(owner) = owner {
let nexus_clone = nexus_spec.lock().clone();
if context.specs().get_volume(&owner).is_err() {
nexus_clone.warn_span(|| tracing::warn!("Attempting to disown orphaned nexus"));
context
.specs()
Expand All @@ -107,10 +112,21 @@ async fn destroy_disowned_nexus(
Err(_) => return PollResult::Ok(PollerState::Busy),
};

let nexus_clone = nexus_spec.lock().clone();
if nexus_clone.managed && !nexus_clone.owned() {
let node_online = matches!(context.registry().get_node_wrapper(&nexus_clone.node).await, Ok(node) if node.read().await.is_online());
let nexus_node = {
let nexus = nexus_spec.lock();
let destroy = nexus.managed && !nexus.owned();
let mut nexus_node = None;
if destroy {
nexus_node = Some(nexus.node.clone())
}
nexus_node
};

if let Some(nexus_node) = nexus_node {
let node_online = matches!(context.registry().get_node_wrapper(&nexus_node).await, Ok(node) if node.read().await.is_online());
if node_online {
let nexus_clone = nexus_spec.lock().clone();
let nexus_uuid = nexus_clone.uuid.clone();
async {
nexus_clone.warn_span(|| tracing::warn!("Attempting to destroy disowned nexus"));
let request = DestroyNexus::from(nexus_clone.clone());
Expand All @@ -128,7 +144,7 @@ async fn destroy_disowned_nexus(
}
}
}
.instrument(tracing::info_span!("destroy_disowned_nexus", nexus.uuid = %nexus_clone.uuid, request.reconcile = true))
.instrument(tracing::info_span!("destroy_disowned_nexus", nexus.uuid = %nexus_uuid, request.reconcile = true))
.await?;
}
}
Expand Down
31 changes: 17 additions & 14 deletions control-plane/agents/core/src/core/reconciler/nexus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,13 @@ async fn nexus_reconciler(
context: &PollContext,
mode: OperationMode,
) -> PollResult {
let nexus_spec_clone = nexus_spec.lock().clone();
let created = {
let nexus_spec = nexus_spec.lock();
nexus_spec.status().created()
};

let mut results = vec![];
if nexus_spec_clone.status().created() {
let mut results = Vec::with_capacity(5);
if created {
results.push(faulted_children_remover(nexus_spec, context, mode).await);
results.push(unknown_children_remover(nexus_spec, context, mode).await);
results.push(missing_children_remover(nexus_spec, context, mode).await);
Expand All @@ -108,14 +111,14 @@ pub(super) async fn faulted_children_remover(
context: &PollContext,
mode: OperationMode,
) -> PollResult {
let nexus_spec_clone = nexus_spec.lock().clone();
let nexus_uuid = nexus_spec_clone.uuid.clone();
let nexus_uuid = nexus_spec.lock().uuid.clone();
let nexus_state = context.registry().get_nexus(&nexus_uuid).await?;
let child_count = nexus_state.children.len();

// Remove faulted children only from a degraded nexus with other healthy children left
if nexus_state.status == NexusStatus::Degraded && child_count > 1 {
async {
let nexus_spec_clone = nexus_spec.lock().clone();
for child in nexus_state.children.iter().filter(|c| c.state.faulted()) {
nexus_spec_clone
.warn_span(|| tracing::warn!("Attempting to remove faulted child '{}'", child.uri));
Expand Down Expand Up @@ -157,17 +160,17 @@ pub(super) async fn unknown_children_remover(
mode: OperationMode,
) -> PollResult {
let nexus_spec_clone = nexus_spec.lock().clone();
let nexus_uuid = nexus_spec_clone.uuid.clone();
let nexus_state = context.registry().get_nexus(&nexus_uuid).await?;
let nexus_state = context.registry().get_nexus(&nexus_spec_clone.uuid).await?;
let state_children = nexus_state.children.iter();
let spec_children = nexus_spec.lock().children.clone();
let spec_children = nexus_spec_clone.children.clone();

let unknown_children = state_children
.filter(|c| !spec_children.iter().any(|spec| spec.uri() == c.uri))
.cloned()
.collect::<Vec<_>>();

if !unknown_children.is_empty() {
let nexus_uuid = nexus_spec_clone.uuid.clone();
async move {
for child in unknown_children {
nexus_spec_clone
Expand Down Expand Up @@ -259,8 +262,7 @@ pub(super) async fn missing_nexus_recreate(
context: &PollContext,
mode: OperationMode,
) -> PollResult {
let nexus = nexus_spec.lock().clone();
let nexus_uuid = nexus.uuid.clone();
let nexus_uuid = nexus_spec.lock().uuid.clone();

if context.registry().get_nexus(&nexus_uuid).await.is_ok() {
return PollResult::Ok(PollerState::Idle);
Expand Down Expand Up @@ -354,6 +356,7 @@ pub(super) async fn missing_nexus_recreate(
}
}

let nexus = nexus_spec.lock().clone();
missing_nexus_recreate(nexus, context, mode).await
}

Expand All @@ -366,10 +369,10 @@ pub(super) async fn fixup_nexus_protocol(
context: &PollContext,
mode: OperationMode,
) -> PollResult {
let nexus = nexus_spec.lock().clone();
let nexus_uuid = nexus.uuid.clone();
let nexus_uuid = nexus_spec.lock().uuid.clone();

if let Ok(nexus_state) = context.registry().get_nexus(&nexus_uuid).await {
let nexus = nexus_spec.lock().clone();
if nexus.share != nexus_state.share {
nexus.warn_span(|| {
tracing::warn!(
Expand Down Expand Up @@ -421,11 +424,11 @@ pub(super) async fn faulted_nexus_remover(
context: &PollContext,
_mode: OperationMode,
) -> PollResult {
let nexus = nexus_spec.lock().clone();
let nexus_uuid = nexus.uuid.clone();
let nexus_uuid = nexus_spec.lock().uuid.clone();

if let Ok(nexus_state) = context.registry().get_nexus(&nexus_uuid).await {
if nexus_state.status == NexusStatus::Faulted {
let nexus = nexus_spec.lock().clone();
let healthy_children = get_healthy_nexus_children(&nexus, context.registry()).await?;
let node = context.registry().get_node_wrapper(&nexus.node).await?;

Expand Down
12 changes: 6 additions & 6 deletions control-plane/agents/core/src/core/reconciler/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ impl TaskPoller for PoolReconciler {
async fn poll(&mut self, context: &PollContext) -> PollResult {
let mut results = vec![];
for pool in context.specs().get_locked_pools() {
results.push(missing_pool_state_reconciler(pool.clone(), context).await);
results.push(deleting_pool_spec_reconciler(pool.clone(), context).await);
results.push(missing_pool_state_reconciler(&pool, context).await);
results.push(deleting_pool_spec_reconciler(&pool, context).await);
}
Self::squash_results(results)
}
Expand All @@ -54,7 +54,7 @@ impl TaskPoller for PoolReconciler {
/// should exist.
#[tracing::instrument(skip(pool_spec, context), level = "trace", fields(pool.uuid = %pool_spec.lock().id, request.reconcile = true))]
async fn missing_pool_state_reconciler(
pool_spec: Arc<Mutex<PoolSpec>>,
pool_spec: &Arc<Mutex<PoolSpec>>,
context: &PollContext,
) -> PollResult {
if !pool_spec.lock().status().created() {
Expand Down Expand Up @@ -83,11 +83,11 @@ async fn missing_pool_state_reconciler(
let node = match context.registry().get_node_wrapper(&pool.node).await {
Ok(node) if !node.read().await.is_online() => {
let node_status = node.read().await.status();
warn_missing(&pool_spec, node_status);
warn_missing(pool_spec, node_status);
return PollResult::Ok(PollerState::Idle);
}
Err(_) => {
warn_missing(&pool_spec, NodeStatus::Unknown);
warn_missing(pool_spec, NodeStatus::Unknown);
return PollResult::Ok(PollerState::Idle);
}
Ok(node) => node,
Expand Down Expand Up @@ -123,7 +123,7 @@ async fn missing_pool_state_reconciler(
/// cleans up any such pool when node comes up.
#[tracing::instrument(skip(pool_spec, context), level = "trace", fields(pool.uuid = %pool_spec.lock().id, request.reconcile = true))]
async fn deleting_pool_spec_reconciler(
pool_spec: Arc<Mutex<PoolSpec>>,
pool_spec: &Arc<Mutex<PoolSpec>>,
context: &PollContext,
) -> PollResult {
if !pool_spec.lock().status().deleting() {
Expand Down
143 changes: 76 additions & 67 deletions control-plane/agents/core/src/core/reconciler/replica/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ use crate::core::{
PollContext, PollEvent, PollResult, PollTimer, PollTriggerEvent, PollerState, TaskPoller,
},
};
use common_lib::types::v0::{message_bus::ReplicaOwners, store::OperationMode};
use std::ops::Deref;
use common_lib::types::v0::{
message_bus::ReplicaOwners,
store::{replica::ReplicaSpec, OperationMode},
};
use parking_lot::Mutex;
use std::{ops::Deref, sync::Arc};

/// Replica reconciler
#[derive(Debug)]
Expand All @@ -29,8 +33,11 @@ impl ReplicaReconciler {
impl TaskPoller for ReplicaReconciler {
async fn poll(&mut self, context: &PollContext) -> PollResult {
let mut results = vec![];
results.push(disown_missing_owners(context).await);
results.push(destroy_orphaned_replicas(context).await);

for replica in context.specs().get_replicas() {
results.push(disown_missing_owners(context, &replica).await);
results.push(destroy_orphaned_replicas(context, &replica).await);
}
Self::squash_results(results)
}

Expand All @@ -49,45 +56,46 @@ impl TaskPoller for ReplicaReconciler {
/// Remove replica owners who no longer exist.
/// In the event that the replicas become orphaned (have no owners) they will be destroyed by the
/// 'destroy_orphaned_replicas' reconcile loop.
async fn disown_missing_owners(context: &PollContext) -> PollResult {
async fn disown_missing_owners(
context: &PollContext,
replica: &Arc<Mutex<ReplicaSpec>>,
) -> PollResult {
let specs = context.specs();

for replica in specs.get_replicas() {
// If we obtain the operation guard no one else can be modifying the replica spec.
if let Ok(_guard) = replica.operation_guard(OperationMode::ReconcileStart) {
let replica_spec = replica.lock().clone();

if replica_spec.managed && replica_spec.owned() {
let mut owner_removed = false;
let owners = &replica_spec.owners;

if let Some(volume) = owners.volume() {
if specs.get_volume(volume).is_err() {
// The volume no longer exists. Remove it as an owner.
replica.lock().owners.disowned_by_volume();
owner_removed = true;
tracing::info!(replica.uuid=%replica_spec.uuid, volume.uuid=%volume, "Removed volume as replica owner");
}
};

owners.nexuses().iter().for_each(|nexus| {
if specs.get_nexus(nexus).is_none() {
// The nexus no longer exists. Remove it as an owner.
replica.lock().owners.disowned_by_nexus(nexus);
owner_removed = true;
tracing::info!(replica.uuid=%replica_spec.uuid, nexus.uuid=%nexus, "Removed nexus as replica owner");
}
});

if owner_removed {
let replica_clone = replica.lock().clone();
if let Err(error) = context.registry().store_obj(&replica_clone).await {
// Log the fact that we couldn't persist the changes.
// If we reload the stale info from the persistent store (on a restart) we
// will run this reconcile loop again and tidy it up, so no need to retry
// here.
tracing::error!(replica.uuid=%replica_clone.uuid, error=%error, "Failed to persist disowned replica")
}
// If we obtain the operation guard no one else can be modifying the replica spec.
if let Ok(_guard) = replica.operation_guard(OperationMode::ReconcileStart) {
let replica_spec = replica.lock().clone();

if replica_spec.managed && replica_spec.owned() {
let mut owner_removed = false;
let owners = &replica_spec.owners;

if let Some(volume) = owners.volume() {
if specs.get_volume(volume).is_err() {
// The volume no longer exists. Remove it as an owner.
replica.lock().owners.disowned_by_volume();
owner_removed = true;
tracing::info!(replica.uuid=%replica_spec.uuid, volume.uuid=%volume, "Removed volume as replica owner");
}
};

owners.nexuses().iter().for_each(|nexus| {
if specs.get_nexus(nexus).is_none() {
// The nexus no longer exists. Remove it as an owner.
replica.lock().owners.disowned_by_nexus(nexus);
owner_removed = true;
tracing::info!(replica.uuid=%replica_spec.uuid, nexus.uuid=%nexus, "Removed nexus as replica owner");
}
});

if owner_removed {
let replica_clone = replica.lock().clone();
if let Err(error) = context.registry().store_obj(&replica_clone).await {
// Log the fact that we couldn't persist the changes.
// If we reload the stale info from the persistent store (on a restart) we
// will run this reconcile loop again and tidy it up, so no need to retry
// here.
tracing::error!(replica.uuid=%replica_clone.uuid, error=%error, "Failed to persist disowned replica")
}
}
}
Expand All @@ -98,32 +106,33 @@ async fn disown_missing_owners(context: &PollContext) -> PollResult {

/// Destroy orphaned replicas.
/// Orphaned replicas are those that are managed but which don't have any owners.
async fn destroy_orphaned_replicas(context: &PollContext) -> PollResult {
for replica in context.specs().get_replicas() {
let _guard = match replica.operation_guard(OperationMode::ReconcileStart) {
Ok(guard) => guard,
Err(_) => return PollResult::Ok(PollerState::Busy),
};

let replica_spec = replica.lock().deref().clone();
if replica_spec.managed && !replica_spec.owned() {
match context
.specs()
.destroy_replica_spec(
context.registry(),
&replica_spec,
ReplicaOwners::default(),
false,
OperationMode::ReconcileStep,
)
.await
{
Ok(_) => {
tracing::info!(replica.uuid=%replica_spec.uuid, "Successfully destroyed orphaned replica");
}
Err(e) => {
tracing::trace!(replica.uuid=%replica_spec.uuid, error=%e, "Failed to destroy orphaned replica");
}
async fn destroy_orphaned_replicas(
context: &PollContext,
replica: &Arc<Mutex<ReplicaSpec>>,
) -> PollResult {
let _guard = match replica.operation_guard(OperationMode::ReconcileStart) {
Ok(guard) => guard,
Err(_) => return PollResult::Ok(PollerState::Busy),
};

let replica_spec = replica.lock().deref().clone();
if replica_spec.managed && !replica_spec.owned() {
match context
.specs()
.destroy_replica_spec(
context.registry(),
&replica_spec,
ReplicaOwners::default(),
false,
OperationMode::ReconcileStep,
)
.await
{
Ok(_) => {
tracing::info!(replica.uuid=%replica_spec.uuid, "Successfully destroyed orphaned replica");
}
Err(e) => {
tracing::trace!(replica.uuid=%replica_spec.uuid, error=%e, "Failed to destroy orphaned replica");
}
}
}
Expand Down
11 changes: 6 additions & 5 deletions control-plane/agents/core/src/core/reconciler/volume/nexus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,13 @@ async fn volume_nexus_reconcile(
Ok(guard) => guard,
Err(_) => return PollResult::Ok(PollerState::Busy),
};
let volume = volume_spec.lock().clone();

if !volume.policy.self_heal || !volume.status.created() {
return PollResult::Ok(PollerState::Idle);
{
let volume = volume_spec.lock();
if !volume.policy.self_heal || !volume.status.created() {
return PollResult::Ok(PollerState::Idle);
}
}

let volume = volume_spec.lock().clone();
match context.specs().get_volume_target_nexus(&volume) {
Some(nexus_spec) => {
let _guard = match nexus_spec.operation_guard(OperationMode::ReconcileStart) {
Expand Down
Loading

0 comments on commit 389a782

Please sign in to comment.