Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage controller: use proper ScheduleContext when evacuating a node #9908

Merged
merged 4 commits into from
Nov 29, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 78 additions & 121 deletions storage_controller/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5498,49 +5498,51 @@ impl Service {

let mut tenants_affected: usize = 0;

for (tenant_shard_id, tenant_shard) in tenants {
if let Some(observed_loc) = tenant_shard.observed.locations.get_mut(&node_id) {
// When a node goes offline, we set its observed configuration to None, indicating unknown: we will
// not assume our knowledge of the node's configuration is accurate until it comes back online
observed_loc.conf = None;
}

if nodes.len() == 1 {
// Special case for single-node cluster: there is no point trying to reschedule
// any tenant shards: avoid doing so, in order to avoid spewing warnings about
// failures to schedule them.
continue;
}
for (_tenant_id, mut schedule_context, shards) in
TenantShardContextIterator::new(tenants, ScheduleMode::Normal)
{
for tenant_shard in shards {
let tenant_shard_id = tenant_shard.tenant_shard_id;
if let Some(observed_loc) =
tenant_shard.observed.locations.get_mut(&node_id)
{
// When a node goes offline, we set its observed configuration to None, indicating unknown: we will
// not assume our knowledge of the node's configuration is accurate until it comes back online
observed_loc.conf = None;
}

if !nodes
.values()
.any(|n| matches!(n.may_schedule(), MaySchedule::Yes(_)))
{
// Special case for when all nodes are unavailable and/or unschedulable: there is no point
// trying to reschedule since there's nowhere else to go. Without this
// branch we incorrectly detach tenants in response to node unavailability.
continue;
}
if nodes.len() == 1 {
// Special case for single-node cluster: there is no point trying to reschedule
// any tenant shards: avoid doing so, in order to avoid spewing warnings about
// failures to schedule them.
continue;
}

if tenant_shard.intent.demote_attached(scheduler, node_id) {
tenant_shard.sequence = tenant_shard.sequence.next();
if !nodes
.values()
.any(|n| matches!(n.may_schedule(), MaySchedule::Yes(_)))
{
// Special case for when all nodes are unavailable and/or unschedulable: there is no point
// trying to reschedule since there's nowhere else to go. Without this
// branch we incorrectly detach tenants in response to node unavailability.
continue;
}

// TODO: populate a ScheduleContext including all shards in the same tenant_id (only matters
// for tenants without secondary locations: if they have a secondary location, then this
// schedule() call is just promoting an existing secondary)
let mut schedule_context = ScheduleContext::default();
if tenant_shard.intent.demote_attached(scheduler, node_id) {
tenant_shard.sequence = tenant_shard.sequence.next();

match tenant_shard.schedule(scheduler, &mut schedule_context) {
Err(e) => {
// It is possible that some tenants will become unschedulable when too many pageservers
// go offline: in this case there isn't much we can do other than make the issue observable.
// TODO: give TenantShard a scheduling error attribute to be queried later.
tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", node_id);
}
Ok(()) => {
if self.maybe_reconcile_shard(tenant_shard, nodes).is_some() {
tenants_affected += 1;
};
match tenant_shard.schedule(scheduler, &mut schedule_context) {
Err(e) => {
// It is possible that some tenants will become unschedulable when too many pageservers
// go offline: in this case there isn't much we can do other than make the issue observable.
// TODO: give TenantShard a scheduling error attribute to be queried later.
tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", node_id);
}
Ok(()) => {
if self.maybe_reconcile_shard(tenant_shard, nodes).is_some() {
tenants_affected += 1;
};
}
}
}
}
Expand Down Expand Up @@ -6011,14 +6013,8 @@ impl Service {
let (nodes, tenants, _scheduler) = locked.parts_mut();
let pageservers = nodes.clone();

let mut schedule_context = ScheduleContext::default();

let mut reconciles_spawned = 0;
for (tenant_shard_id, shard) in tenants.iter_mut() {
if tenant_shard_id.is_shard_zero() {
schedule_context = ScheduleContext::default();
}

for shard in tenants.values_mut() {
// Skip checking if this shard is already enqueued for reconciliation
if shard.delayed_reconcile && self.reconciler_concurrency.available_permits() == 0 {
// If there is something delayed, then return a nonzero count so that
Expand All @@ -6033,8 +6029,6 @@ impl Service {
if self.maybe_reconcile_shard(shard, &pageservers).is_some() {
reconciles_spawned += 1;
}

schedule_context.avoid(&shard.intent.all_pageservers());
}

reconciles_spawned
Expand Down Expand Up @@ -6103,95 +6097,62 @@ impl Service {
}

fn optimize_all_plan(&self) -> Vec<(TenantShardId, ScheduleOptimization)> {
let mut schedule_context = ScheduleContext::default();

let mut tenant_shards: Vec<&TenantShard> = Vec::new();

// How many candidate optimizations we will generate, before evaluating them for readniess: setting
// this higher than the execution limit gives us a chance to execute some work even if the first
// few optimizations we find are not ready.
const MAX_OPTIMIZATIONS_PLAN_PER_PASS: usize = 8;

let mut work = Vec::new();

let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
for (tenant_shard_id, shard) in tenants.iter() {
if tenant_shard_id.is_shard_zero() {
// Reset accumulators on the first shard in a tenant
schedule_context = ScheduleContext::default();
schedule_context.mode = ScheduleMode::Speculative;
tenant_shards.clear();
}

if work.len() >= MAX_OPTIMIZATIONS_PLAN_PER_PASS {
break;
}

match shard.get_scheduling_policy() {
ShardSchedulingPolicy::Active => {
// Ok to do optimization
for (_tenant_id, schedule_context, shards) in
TenantShardContextIterator::new(tenants, ScheduleMode::Speculative)
{
for shard in shards {
if work.len() >= MAX_OPTIMIZATIONS_PLAN_PER_PASS {
break;
}
ShardSchedulingPolicy::Essential
| ShardSchedulingPolicy::Pause
| ShardSchedulingPolicy::Stop => {
// Policy prevents optimizing this shard.
continue;
match shard.get_scheduling_policy() {
ShardSchedulingPolicy::Active => {
// Ok to do optimization
}
ShardSchedulingPolicy::Essential
| ShardSchedulingPolicy::Pause
| ShardSchedulingPolicy::Stop => {
// Policy prevents optimizing this shard.
continue;
}
}
}

// Accumulate the schedule context for all the shards in a tenant: we must have
// the total view of all shards before we can try to optimize any of them.
schedule_context.avoid(&shard.intent.all_pageservers());
if let Some(attached) = shard.intent.get_attached() {
schedule_context.push_attached(*attached);
}
tenant_shards.push(shard);

// Once we have seen the last shard in the tenant, proceed to search across all shards
// in the tenant for optimizations
if shard.shard.number.0 == shard.shard.count.count() - 1 {
if tenant_shards.iter().any(|s| s.reconciler.is_some()) {
if !matches!(shard.splitting, SplitState::Idle)
|| matches!(shard.policy, PlacementPolicy::Detached)
|| shard.reconciler.is_some()
{
// Do not start any optimizations while another change to the tenant is ongoing: this
// is not necessary for correctness, but simplifies operations and implicitly throttles
// optimization changes to happen in a "trickle" over time.
continue;
}

if tenant_shards.iter().any(|s| {
!matches!(s.splitting, SplitState::Idle)
|| matches!(s.policy, PlacementPolicy::Detached)
}) {
// Never attempt to optimize a tenant that is currently being split, or
// a tenant that is meant to be detached
continue;
}

// TODO: optimization calculations are relatively expensive: create some fast-path for
// the common idle case (avoiding the search on tenants that we have recently checked)

for shard in &tenant_shards {
if let Some(optimization) =
// If idle, maybe ptimize attachments: if a shard has a secondary location that is preferable to
// its primary location based on soft constraints, cut it over.
shard.optimize_attachment(nodes, &schedule_context)
{
work.push((shard.tenant_shard_id, optimization));
break;
} else if let Some(optimization) =
// If idle, maybe optimize secondary locations: if a shard has a secondary location that would be
// better placed on another node, based on ScheduleContext, then adjust it. This
// covers cases like after a shard split, where we might have too many shards
// in the same tenant with secondary locations on the node where they originally split.
shard.optimize_secondary(scheduler, &schedule_context)
{
work.push((shard.tenant_shard_id, optimization));
break;
}

// TODO: extend this mechanism to prefer attaching on nodes with fewer attached
// tenants (i.e. extend schedule state to distinguish attached from secondary counts),
// for the total number of attachments on a node (not just within a tenant.)
if let Some(optimization) =
// If idle, maybe ptimize attachments: if a shard has a secondary location that is preferable to
// its primary location based on soft constraints, cut it over.
shard.optimize_attachment(nodes, &schedule_context)
{
work.push((shard.tenant_shard_id, optimization));
break;
} else if let Some(optimization) =
// If idle, maybe optimize secondary locations: if a shard has a secondary location that would be
// better placed on another node, based on ScheduleContext, then adjust it. This
// covers cases like after a shard split, where we might have too many shards
// in the same tenant with secondary locations on the node where they originally split.
shard.optimize_secondary(scheduler, &schedule_context)
{
work.push((shard.tenant_shard_id, optimization));
break;
}
}
}
Expand Down Expand Up @@ -7094,10 +7055,6 @@ impl Service {
}
}

/// When making scheduling decisions, it is useful to have the ScheduleContext for a whole
/// tenant while considering the individual shards within it. This iterator is a helper
/// that gathers all the shards in a tenant and then yields them together with a ScheduleContext
/// for the tenant.
jcsp marked this conversation as resolved.
Show resolved Hide resolved
struct TenantShardContextIterator<'a> {
jcsp marked this conversation as resolved.
Show resolved Hide resolved
schedule_mode: ScheduleMode,
inner: std::collections::btree_map::IterMut<'a, TenantShardId, TenantShard>,
Expand Down
Loading