Skip to content

Commit

Permalink
Assign all sources/shards, even if this require exceeding the indexer
Browse files Browse the repository at this point in the history
capacities.

We do that simply by scaling the indexer capacities.
First we scale the indexer capacities so that the sum of their
capacities exceeds the total load of the source to assign.

If the packing algorithm fails, we inflates the capacities again
by 10% and retry.

Added assert on convert solution to physical solution.

Closes #4290
  • Loading branch information
fulmicoton committed Jan 12, 2024
1 parent 44360ac commit d213de3
Show file tree
Hide file tree
Showing 4 changed files with 262 additions and 161 deletions.
54 changes: 16 additions & 38 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,21 +192,29 @@ impl IndexingScheduler {
// has happened.
pub(crate) fn schedule_indexing_plan_if_needed(&mut self, model: &ControlPlaneModel) {
crate::metrics::CONTROL_PLANE_METRICS.schedule_total.inc();
let mut indexers: Vec<(String, IndexerNodeInfo)> = self.get_indexers_from_indexer_pool();
if indexers.is_empty() {
warn!("no indexer available, cannot schedule an indexing plan");
return;
};

let sources = get_sources_to_schedule(model);

let mut indexers: Vec<(String, IndexerNodeInfo)> = self.get_indexers_from_indexer_pool();

let indexer_id_to_cpu_capacities: FnvHashMap<String, CpuCapacity> = indexers
.iter()
.map(|(indexer_id, indexer_node_info)| {
(indexer_id.to_string(), indexer_node_info.indexing_capacity)
.filter_map(|(indexer_id, indexer_node_info)| {
if indexer_node_info.indexing_capacity.cpu_millis() > 0 {
Some((indexer_id.to_string(), indexer_node_info.indexing_capacity))
} else {
None
}
})
.collect();

if indexer_id_to_cpu_capacities.is_empty() {
if !sources.is_empty() {
warn!("no indexing capacity available, cannot schedule an indexing plan");
}
return;
};

let new_physical_plan = build_physical_indexing_plan(
&sources,
&indexer_id_to_cpu_capacities,
Expand Down Expand Up @@ -780,37 +788,7 @@ mod tests {
let indexer_id = format!("indexer-{i}");
indexer_max_loads.insert(indexer_id, mcpu(4_000));
}
let physical_indexing_plan = build_physical_indexing_plan(&sources, &indexer_max_loads, None);
let source_map: FnvHashMap<&SourceUid, &SourceToSchedule> = sources
.iter()
.map(|source| (&source.source_uid, source))
.collect();
for (node_id, tasks) in physical_indexing_plan.indexing_tasks_per_indexer() {
let mut load_in_node = 0u32;
for task in tasks {
let source_uid = SourceUid {
index_uid: IndexUid::from(task.index_uid.clone()),
source_id: task.source_id.clone(),
};
let source_to_schedule = source_map.get(&source_uid).unwrap();
match &source_to_schedule.source_type {
SourceToScheduleType::IngestV1 => {}
SourceToScheduleType::Sharded {
load_per_shard,
..
} => {
load_in_node += load_per_shard.get() * task.shard_ids.len() as u32;
}
SourceToScheduleType::NonSharded {
num_pipelines: _ ,
load_per_pipeline,
} => {
load_in_node += load_per_pipeline.get();
}
}
}
assert!(load_in_node <= indexer_max_loads.get(node_id).unwrap().cpu_millis());
}
let _physical_indexing_plan = build_physical_indexing_plan(&sources, &indexer_max_loads, None);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl IdToOrdMap {
fn convert_physical_plan_to_solution(
plan: &PhysicalIndexingPlan,
id_to_ord_map: &IdToOrdMap,
sources: &[SourceToSchedule],
solution: &mut SchedulingSolution,
) {
for (indexer_id, indexing_tasks) in plan.indexing_tasks_per_indexer() {
Expand All @@ -124,7 +125,21 @@ fn convert_physical_plan_to_solution(
source_id: indexing_task.source_id.clone(),
};
if let Some(source_ord) = id_to_ord_map.source_ord(&source_uid) {
indexer_assignment.add_shards(source_ord, indexing_task.shard_ids.len() as u32);
let source_type = &sources[source_ord as usize].source_type;
match source_type {
SourceToScheduleType::Sharded { .. } => {
indexer_assignment
.add_shards(source_ord, indexing_task.shard_ids.len() as u32);
}
SourceToScheduleType::NonSharded { .. } => {
// For non-sharded sources like Kafka, one pipeline = one shard in the
// solutions
indexer_assignment.add_shards(source_ord, 1);
}
SourceToScheduleType::IngestV1 => {
// Ingest V1 is not part of the logical placement algorithm.
}
}
}
}
}
Expand Down Expand Up @@ -310,16 +325,17 @@ fn pick_indexer(capacity_per_node: &[(String, u32)]) -> impl Iterator<Item = &st
/// We do not support moving shard from one pipeline to another, so if required this function may
/// also return instruction about deleting / adding new shards.
fn convert_scheduling_solution_to_physical_plan(
mut solution: SchedulingSolution,
solution: &SchedulingSolution,
id_to_ord_map: &IdToOrdMap,
sources: &[SourceToSchedule],
previous_plan_opt: Option<&PhysicalIndexingPlan>,
) -> PhysicalIndexingPlan {
let mut indexer_assignments = solution.indexer_assignments.clone();
let mut new_physical_plan = PhysicalIndexingPlan::with_indexer_ids(&id_to_ord_map.indexer_ids);
for (indexer_id, indexer_assignment) in id_to_ord_map
.indexer_ids
.iter()
.zip(&mut solution.indexer_assignments)
.zip(&mut indexer_assignments)
{
let previous_tasks_for_indexer = previous_plan_opt
.and_then(|previous_plan| previous_plan.indexer(indexer_id))
Expand Down Expand Up @@ -349,7 +365,7 @@ fn convert_scheduling_solution_to_physical_plan(
for (indexer, indexing_tasks) in new_physical_plan.indexing_tasks_per_indexer_mut() {
let indexer_ord = id_to_ord_map.indexer_ord(indexer).unwrap();
let mut num_shards_for_indexer_source: u32 =
solution.indexer_assignments[indexer_ord].num_shards(source_ord);
indexer_assignments[indexer_ord].num_shards(source_ord);
for indexing_task in indexing_tasks {
if indexing_task.index_uid == source.source_uid.index_uid.as_str()
&& indexing_task.source_id == source.source_uid.source_id
Expand Down Expand Up @@ -395,11 +411,38 @@ fn convert_scheduling_solution_to_physical_plan(
}
}

assert_post_condition_physical_plan_match_solution(
&new_physical_plan,
solution,
sources,
id_to_ord_map,
);

new_physical_plan.normalize();

new_physical_plan
}

// Checks that's the physical solution indeed matches the scheduling solution.
fn assert_post_condition_physical_plan_match_solution(
physical_plan: &PhysicalIndexingPlan,
solution: &SchedulingSolution,
sources: &[SourceToSchedule],
id_to_ord_map: &IdToOrdMap,
) {
let num_indexers = physical_plan.indexing_tasks_per_indexer().len();
assert_eq!(num_indexers, solution.indexer_assignments.len());
assert_eq!(num_indexers, id_to_ord_map.indexer_ids.len());
let mut reconstructed_solution = SchedulingSolution::with_num_indexers(num_indexers);
convert_physical_plan_to_solution(
physical_plan,
id_to_ord_map,
sources,
&mut reconstructed_solution,
);
assert_eq!(solution, &reconstructed_solution);
}

fn add_shard_to_indexer(
missing_shard: ShardId,
indexer: &str,
Expand Down Expand Up @@ -435,6 +478,35 @@ fn add_shard_to_indexer(
}
}

// If the total node capacities is lower than 110% of the problem load, this
// function scales the load of the indexer to reach this limit.
fn inflate_node_capacities_if_necessary(problem: &mut SchedulingProblem) {
// First we scale the problem to the point where any indexer can fit the largest shard.
let Some(largest_shard_load) = problem.sources().map(|source| source.load_per_shard).max()
else {
return;
};
let min_indexer_capacity = (0..problem.num_indexers())
.map(|indexer_ord| problem.indexer_cpu_capacity(indexer_ord))
.min()
.expect("At least one indexer is required");
assert_ne!(min_indexer_capacity.cpu_millis(), 0);
if min_indexer_capacity.cpu_millis() < largest_shard_load.get() {
let scaling_factor =
(largest_shard_load.get() as f32) / (min_indexer_capacity.cpu_millis() as f32);
problem.scale_node_capacities(scaling_factor);
}

let total_node_capacities: f32 = problem.total_node_capacities().cpu_millis() as f32;
let total_load: f32 = problem.total_load() as f32;
let inflated_total_load = total_load * 1.2f32;
if inflated_total_load >= total_node_capacities {
// We need to inflate our node capacities to match the problem.
let ratio = inflated_total_load / total_node_capacities;
problem.scale_node_capacities(ratio);
}
}

/// Creates a physical plan given the current situation of the cluster and the list of sources
/// to schedule.
///
Expand Down Expand Up @@ -470,6 +542,7 @@ pub fn build_physical_indexing_plan(
}

let mut problem = SchedulingProblem::with_indexer_cpu_capacities(indexer_cpu_capacities);

for source in sources {
if let Some(source_ord) = populate_problem(source, &mut problem) {
let registered_source_ord = id_to_ord_map.add_source_uid(source.source_uid.clone());
Expand All @@ -480,22 +553,20 @@ pub fn build_physical_indexing_plan(
// Populate the previous solution.
let mut previous_solution = problem.new_solution();
if let Some(previous_plan) = previous_plan_opt {
convert_physical_plan_to_solution(previous_plan, &id_to_ord_map, &mut previous_solution);
convert_physical_plan_to_solution(
previous_plan,
&id_to_ord_map,
sources,
&mut previous_solution,
);
}

// Compute the new scheduling solution
let (new_solution, unassigned_shards) = scheduling_logic::solve(&problem, previous_solution);

if !unassigned_shards.is_empty() {
// TODO this is probably a bad idea to just not overschedule, as having a single index trail
// behind will prevent the log GC.
// A better strategy would probably be to close shard, and start prevent ingestion.
error!("unable to assign all sources in the cluster");
}
let new_solution = scheduling_logic::solve(problem, previous_solution);

// Convert the new scheduling solution back to a physical plan.
convert_scheduling_solution_to_physical_plan(
new_solution,
&new_solution,
&id_to_ord_map,
sources,
previous_plan_opt,
Expand Down Expand Up @@ -602,7 +673,7 @@ mod tests {
let physical_plan = build_physical_indexing_plan(&sources, &indexer_max_loads, None);
assert_eq!(physical_plan.indexing_tasks_per_indexer().len(), 1);
let expected_tasks = physical_plan.indexer(&indexer1).unwrap();
assert_eq!(expected_tasks.len(), 1);
assert_eq!(expected_tasks.len(), 2);
assert_eq!(&expected_tasks[0].source_id, &source_uid1.source_id);
}
{
Expand Down
Loading

0 comments on commit d213de3

Please sign in to comment.