Skip to content

Commit

Permalink
storage controller: make chaos less disruptive to AZ locality
Browse files Browse the repository at this point in the history
  • Loading branch information
jcsp committed Jan 17, 2025
1 parent 871e8b3 commit acbfb8c
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 37 deletions.
116 changes: 79 additions & 37 deletions storage_controller/src/service/chaos_injector.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
use std::{sync::Arc, time::Duration};
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
time::Duration,
};

use pageserver_api::controller_api::ShardSchedulingPolicy;
use rand::seq::SliceRandom;
use rand::thread_rng;
use tokio_util::sync::CancellationToken;
use utils::id::NodeId;
use utils::shard::TenantShardId;

use super::Service;
use super::{Node, Scheduler, Service, TenantShard};

pub struct ChaosInjector {
service: Arc<Service>,
Expand Down Expand Up @@ -35,50 +41,86 @@ impl ChaosInjector {
}
}

/// If a shard has a secondary and attached location, then re-assign the secondary to be
/// attached and the attached to be secondary.
///
/// Only modifies tenants if they're in Active scheduling policy.
fn maybe_migrate_to_secondary(
&self,
tenant_shard_id: TenantShardId,
nodes: &Arc<HashMap<NodeId, Node>>,
tenants: &mut BTreeMap<TenantShardId, TenantShard>,
scheduler: &mut Scheduler,
) {
let shard = tenants
.get_mut(&tenant_shard_id)
.expect("Held lock between choosing ID and this get");

if !matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active) {
// Skip non-active scheduling policies, so that a shard with a policy like Pause can
// be pinned without being disrupted by us.
tracing::info!(
"Skipping shard {tenant_shard_id}: scheduling policy is {:?}",
shard.get_scheduling_policy()
);
return;
}

// Pick a secondary to promote
let Some(new_location) = shard
.intent
.get_secondary()
.choose(&mut thread_rng())
.cloned()
else {
tracing::info!(
"Skipping shard {tenant_shard_id}: no secondary location, can't migrate"
);
return;
};

let Some(old_location) = *shard.intent.get_attached() else {
tracing::info!("Skipping shard {tenant_shard_id}: currently has no attached location");
return;
};

tracing::info!("Injecting chaos: migrate {tenant_shard_id} {old_location}->{new_location}");

shard.intent.demote_attached(scheduler, old_location);
shard.intent.promote_attached(scheduler, new_location);
self.service.maybe_reconcile_shard(shard, nodes);
}

async fn inject_chaos(&mut self) {
// Pick some shards to interfere with
let batch_size = 128;
let mut inner = self.service.inner.write().unwrap();
let (nodes, tenants, scheduler) = inner.parts_mut();
let tenant_ids = tenants.keys().cloned().collect::<Vec<_>>();
let victims = tenant_ids.choose_multiple(&mut thread_rng(), batch_size);

for victim in victims {
let shard = tenants
.get_mut(victim)
.expect("Held lock between choosing ID and this get");

if !matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active) {
// Skip non-active scheduling policies, so that a shard with a policy like Pause can
// be pinned without being disrupted by us.
tracing::info!(
"Skipping shard {victim}: scheduling policy is {:?}",
shard.get_scheduling_policy()
);
continue;
// Prefer to migrate tenants that are currently outside their home AZ. This avoids the chaos injector
// continuously pushing tenants outside their home AZ: instead, we'll tend to cycle between picking some
// random tenants to move, and then on next chaos iteration moving them back, then picking some new
// random tenants on the next iteration.
let mut victims = Vec::with_capacity(batch_size);
for shard in tenants.values() {
if shard.is_attached_outside_preferred_az(nodes) {
victims.push(shard.tenant_shard_id);
}

if victims.len() >= batch_size {
break;
}
}

// Pick a secondary to promote
let Some(new_location) = shard
.intent
.get_secondary()
.choose(&mut thread_rng())
.cloned()
else {
tracing::info!("Skipping shard {victim}: no secondary location, can't migrate");
continue;
};

let Some(old_location) = *shard.intent.get_attached() else {
tracing::info!("Skipping shard {victim}: currently has no attached location");
continue;
};

tracing::info!("Injecting chaos: migrate {victim} {old_location}->{new_location}");

shard.intent.demote_attached(scheduler, old_location);
shard.intent.promote_attached(scheduler, new_location);
self.service.maybe_reconcile_shard(shard, nodes);
let choose_random = batch_size.saturating_sub(victims.len());
tracing::info!("Injecting chaos: found {} shards to migrate back to home AZ, picking {choose_random} random shards to migrate", victims.len());

let random_victims = tenant_ids.choose_multiple(&mut thread_rng(), choose_random);
victims.extend(random_victims);

for victim in victims {
self.maybe_migrate_to_secondary(victim, nodes, tenants, scheduler);
}
}
}
17 changes: 17 additions & 0 deletions storage_controller/src/tenant_shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1793,6 +1793,23 @@ impl TenantShard {
}
}
}

/// Returns true if the tenant shard is attached to a node that is outside the preferred AZ.
///
/// If the shard does not have a preferred AZ, returns false.
pub(crate) fn is_attached_outside_preferred_az(&self, nodes: &HashMap<NodeId, Node>) -> bool {
self.intent
.get_attached()
.map(|node_id| {
Some(
nodes
.get(&node_id)
.expect("referenced node exists")
.get_availability_zone_id(),
) == self.intent.preferred_az_id.as_ref()
})
.unwrap_or(false)
}
}

impl Drop for TenantShard {
Expand Down

0 comments on commit acbfb8c

Please sign in to comment.