diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index 725967bba0b..896a62bebb1 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -196,21 +196,29 @@ impl IndexingScheduler { // has happened. pub(crate) fn schedule_indexing_plan_if_needed(&mut self, model: &ControlPlaneModel) { crate::metrics::CONTROL_PLANE_METRICS.schedule_total.inc(); - let mut indexers: Vec<(String, IndexerNodeInfo)> = self.get_indexers_from_indexer_pool(); - if indexers.is_empty() { - warn!("no indexer available, cannot schedule an indexing plan"); - return; - }; let sources = get_sources_to_schedule(model); + let mut indexers: Vec<(String, IndexerNodeInfo)> = self.get_indexers_from_indexer_pool(); + let indexer_id_to_cpu_capacities: FnvHashMap = indexers .iter() - .map(|(indexer_id, indexer_node_info)| { - (indexer_id.to_string(), indexer_node_info.indexing_capacity) + .filter_map(|(indexer_id, indexer_node_info)| { + if indexer_node_info.indexing_capacity.cpu_millis() > 0 { + Some((indexer_id.to_string(), indexer_node_info.indexing_capacity)) + } else { + None + } }) .collect(); + if indexer_id_to_cpu_capacities.is_empty() { + if !sources.is_empty() { + warn!("no indexing capacity available, cannot schedule an indexing plan"); + } + return; + }; + let new_physical_plan = build_physical_indexing_plan( &sources, &indexer_id_to_cpu_capacities, @@ -808,37 +816,7 @@ mod tests { let indexer_id = format!("indexer-{i}"); indexer_max_loads.insert(indexer_id, mcpu(4_000)); } - let physical_indexing_plan = build_physical_indexing_plan(&sources, &indexer_max_loads, None); - let source_map: FnvHashMap<&SourceUid, &SourceToSchedule> = sources - .iter() - .map(|source| (&source.source_uid, source)) - .collect(); - for (node_id, tasks) in physical_indexing_plan.indexing_tasks_per_indexer() { - let mut load_in_node = 0u32; - for task in tasks { - let source_uid = SourceUid { - index_uid: IndexUid::from(task.index_uid.clone()), - source_id: task.source_id.clone(), - }; - let source_to_schedule = source_map.get(&source_uid).unwrap(); - match &source_to_schedule.source_type { - SourceToScheduleType::IngestV1 => {} - SourceToScheduleType::Sharded { - load_per_shard, - .. - } => { - load_in_node += load_per_shard.get() * task.shard_ids.len() as u32; - } - SourceToScheduleType::NonSharded { - num_pipelines: _ , - load_per_pipeline, - } => { - load_in_node += load_per_pipeline.get(); - } - } - } - assert!(load_in_node <= indexer_max_loads.get(node_id).unwrap().cpu_millis()); - } + let _physical_indexing_plan = build_physical_indexing_plan(&sources, &indexer_max_loads, None); } } diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs index 031783e5539..dab57839198 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs @@ -77,19 +77,22 @@ fn populate_problem( } #[derive(Default)] -struct IdToOrdMap { +struct IdToOrdMap<'a> { indexer_ids: Vec, - source_uids: Vec, + sources: Vec<&'a SourceToSchedule>, indexer_id_to_indexer_ord: FnvHashMap, source_uid_to_source_ord: FnvHashMap, } -impl IdToOrdMap { - fn add_source_uid(&mut self, source_uid: SourceUid) -> SourceOrd { +impl<'a> IdToOrdMap<'a> { + // All source added are required to have a different source uid. + fn add_source(&mut self, source: &'a SourceToSchedule) -> SourceOrd { let source_ord = self.source_uid_to_source_ord.len() as SourceOrd; - self.source_uid_to_source_ord - .insert(source_uid.clone(), source_ord); - self.source_uids.push(source_uid); + let previous_item = self + .source_uid_to_source_ord + .insert(source.source_uid.clone(), source_ord); + assert!(previous_item.is_none()); + self.sources.push(&source); source_ord } @@ -97,6 +100,11 @@ impl IdToOrdMap { self.source_uid_to_source_ord.get(source_uid).copied() } + fn source(&self, source_uid: &SourceUid) -> Option<(SourceOrd, &'a SourceToSchedule)> { + let source_ord = self.source_uid_to_source_ord.get(source_uid).copied()?; + Some((source_ord, self.sources[source_ord as usize])) + } + fn indexer_ord(&self, indexer_id: &str) -> Option { self.indexer_id_to_indexer_ord.get(indexer_id).copied() } @@ -123,8 +131,21 @@ fn convert_physical_plan_to_solution( index_uid: IndexUid::from(indexing_task.index_uid.clone()), source_id: indexing_task.source_id.clone(), }; - if let Some(source_ord) = id_to_ord_map.source_ord(&source_uid) { - indexer_assignment.add_shards(source_ord, indexing_task.shard_ids.len() as u32); + if let Some((source_ord, source)) = id_to_ord_map.source(&source_uid) { + match &source.source_type { + SourceToScheduleType::Sharded { .. } => { + indexer_assignment + .add_shards(source_ord, indexing_task.shard_ids.len() as u32); + } + SourceToScheduleType::NonSharded { .. } => { + // For non-sharded sources like Kafka, one pipeline = one shard in the + // solutions + indexer_assignment.add_shards(source_ord, 1); + } + SourceToScheduleType::IngestV1 => { + // Ingest V1 is not part of the logical placement algorithm. + } + } } } } @@ -310,16 +331,17 @@ fn pick_indexer(capacity_per_node: &[(String, u32)]) -> impl Iterator, ) -> PhysicalIndexingPlan { + let mut indexer_assignments = solution.indexer_assignments.clone(); let mut new_physical_plan = PhysicalIndexingPlan::with_indexer_ids(&id_to_ord_map.indexer_ids); for (indexer_id, indexer_assignment) in id_to_ord_map .indexer_ids .iter() - .zip(&mut solution.indexer_assignments) + .zip(&mut indexer_assignments) { let previous_tasks_for_indexer = previous_plan_opt .and_then(|previous_plan| previous_plan.indexer(indexer_id)) @@ -349,7 +371,7 @@ fn convert_scheduling_solution_to_physical_plan( for (indexer, indexing_tasks) in new_physical_plan.indexing_tasks_per_indexer_mut() { let indexer_ord = id_to_ord_map.indexer_ord(indexer).unwrap(); let mut num_shards_for_indexer_source: u32 = - solution.indexer_assignments[indexer_ord].num_shards(source_ord); + indexer_assignments[indexer_ord].num_shards(source_ord); for indexing_task in indexing_tasks { if indexing_task.index_uid == source.source_uid.index_uid.as_str() && indexing_task.source_id == source.source_uid.source_id @@ -400,6 +422,20 @@ fn convert_scheduling_solution_to_physical_plan( new_physical_plan } +// Checks that's the physical solution indeed matches the scheduling solution. +fn assert_post_condition_physical_plan_match_solution( + physical_plan: &PhysicalIndexingPlan, + solution: &SchedulingSolution, + id_to_ord_map: &IdToOrdMap, +) { + let num_indexers = physical_plan.indexing_tasks_per_indexer().len(); + assert_eq!(num_indexers, solution.indexer_assignments.len()); + assert_eq!(num_indexers, id_to_ord_map.indexer_ids.len()); + let mut reconstructed_solution = SchedulingSolution::with_num_indexers(num_indexers); + convert_physical_plan_to_solution(physical_plan, id_to_ord_map, &mut reconstructed_solution); + assert_eq!(solution, &reconstructed_solution); +} + fn add_shard_to_indexer( missing_shard: ShardId, indexer: &str, @@ -435,6 +471,35 @@ fn add_shard_to_indexer( } } +// If the total node capacities is lower than 110% of the problem load, this +// function scales the load of the indexer to reach this limit. +fn inflate_node_capacities_if_necessary(problem: &mut SchedulingProblem) { + // First we scale the problem to the point where any indexer can fit the largest shard. + let Some(largest_shard_load) = problem.sources().map(|source| source.load_per_shard).max() + else { + return; + }; + let min_indexer_capacity = (0..problem.num_indexers()) + .map(|indexer_ord| problem.indexer_cpu_capacity(indexer_ord)) + .min() + .expect("At least one indexer is required"); + assert_ne!(min_indexer_capacity.cpu_millis(), 0); + if min_indexer_capacity.cpu_millis() < largest_shard_load.get() { + let scaling_factor = + (largest_shard_load.get() as f32) / (min_indexer_capacity.cpu_millis() as f32); + problem.scale_node_capacities(scaling_factor); + } + + let total_node_capacities: f32 = problem.total_node_capacities().cpu_millis() as f32; + let total_load: f32 = problem.total_load() as f32; + let inflated_total_load = total_load * 1.2f32; + if inflated_total_load >= total_node_capacities { + // We need to inflate our node capacities to match the problem. + let ratio = inflated_total_load / total_node_capacities; + problem.scale_node_capacities(ratio); + } +} + /// Creates a physical plan given the current situation of the cluster and the list of sources /// to schedule. /// @@ -477,9 +542,10 @@ pub fn build_physical_indexing_plan( } let mut problem = SchedulingProblem::with_indexer_cpu_capacities(indexer_cpu_capacities); + for source in sources { if let Some(source_ord) = populate_problem(source, &mut problem) { - let registered_source_ord = id_to_ord_map.add_source_uid(source.source_uid.clone()); + let registered_source_ord = id_to_ord_map.add_source(source); assert_eq!(source_ord, registered_source_ord); } } @@ -491,22 +557,23 @@ pub fn build_physical_indexing_plan( } // Compute the new scheduling solution - let (new_solution, unassigned_shards) = scheduling_logic::solve(&problem, previous_solution); - - if !unassigned_shards.is_empty() { - // TODO this is probably a bad idea to just not overschedule, as having a single index trail - // behind will prevent the log GC. - // A better strategy would probably be to close shard, and start prevent ingestion. - error!("unable to assign all sources in the cluster"); - } + let new_solution = scheduling_logic::solve(problem, previous_solution); // Convert the new scheduling solution back to a physical plan. - convert_scheduling_solution_to_physical_plan( - new_solution, + let new_physical_plan = convert_scheduling_solution_to_physical_plan( + &new_solution, &id_to_ord_map, sources, previous_plan_opt, - ) + ); + + assert_post_condition_physical_plan_match_solution( + &new_physical_plan, + &new_solution, + &id_to_ord_map, + ); + + new_physical_plan } #[cfg(test)] @@ -609,7 +676,7 @@ mod tests { let physical_plan = build_physical_indexing_plan(&sources, &indexer_max_loads, None); assert_eq!(physical_plan.indexing_tasks_per_indexer().len(), 1); let expected_tasks = physical_plan.indexer(&indexer1).unwrap(); - assert_eq!(expected_tasks.len(), 1); + assert_eq!(expected_tasks.len(), 2); assert_eq!(&expected_tasks[0].source_id, &source_uid1.source_id); } { @@ -808,4 +875,34 @@ mod tests { let indexers: Vec<&str> = super::pick_indexer(&indexer_capacity).collect(); assert_eq!(indexers, &["node1", "node3", "node3", "node4", "node4"]); } + + #[test] + fn test_solution_reconstruction() { + let sources_to_schedule = vec![ + SourceToSchedule { + source_uid: SourceUid { + index_uid: IndexUid::parse("otel-logs-v0_6:01HKYD1SE37C90KSH21CD1M11A") + .unwrap(), + source_id: "_ingest-api-source".to_string(), + }, + source_type: SourceToScheduleType::IngestV1, + }, + SourceToSchedule { + source_uid: SourceUid { + index_uid: IndexUid::parse( + "simian_chico_12856033706389338959:01HKYD414H1WVSASC5YD972P39", + ) + .unwrap(), + source_id: "_ingest-source".to_string(), + }, + source_type: SourceToScheduleType::Sharded { + shard_ids: vec![1], + load_per_shard: NonZeroU32::new(250).unwrap(), + }, + }, + ]; + let mut capacities = FnvHashMap::default(); + capacities.insert("indexer-1".to_string(), CpuCapacity::from_cpu_millis(8000)); + build_physical_indexing_plan(&sources_to_schedule, &capacities, None); + } } diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs index a8c3b069804..cc5dbbd1efb 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs @@ -22,8 +22,10 @@ use std::collections::btree_map::Entry; use std::collections::{BTreeMap, BinaryHeap, HashMap}; use quickwit_proto::indexing::CpuCapacity; +use tracing::warn; use super::scheduling_logic_model::*; +use crate::indexing_scheduler::scheduling::inflate_node_capacities_if_necessary; // ------------------------------------------------------------------------------------ // High level algorithm @@ -39,17 +41,17 @@ fn check_contract_conditions(problem: &SchedulingProblem, solution: &SchedulingS } pub fn solve( - problem: &SchedulingProblem, + mut problem: SchedulingProblem, previous_solution: SchedulingSolution, -) -> (SchedulingSolution, BTreeMap) { +) -> SchedulingSolution { + // We first inflate the indexer capacities to make sure they globally have at least 110% of the + // total problem load. + inflate_node_capacities_if_necessary(&mut problem); let mut solution = previous_solution; - check_contract_conditions(problem, &solution); - remove_extraneous_shards(problem, &mut solution); - enforce_indexers_cpu_capacity(problem, &mut solution); - let still_unassigned = place_unassigned_shards(problem, &mut solution); - // TODO ideally we should have some smarter logic here to bread first search for a better - // solution. - (solution, still_unassigned) + check_contract_conditions(&problem, &solution); + remove_extraneous_shards(&problem, &mut solution); + enforce_indexers_cpu_capacity(&problem, &mut solution); + place_unassigned_shards(problem, solution) } // ------------------------------------------------------------------------- @@ -88,7 +90,7 @@ fn remove_extraneous_shards(problem: &SchedulingProblem, solution: &mut Scheduli } } - let mut indexer_available_capacity: Vec = solution + let mut indexer_available_capacity: Vec = solution .indexer_assignments .iter() .map(|indexer_assignment| indexer_assignment.indexer_available_capacity(problem)) @@ -154,10 +156,11 @@ fn enforce_indexer_cpu_capacity( indexer_assignment: &mut IndexerAssignment, ) { let total_load = indexer_assignment.total_cpu_load(problem); - if total_load <= indexer_cpu_capacity { + if total_load <= indexer_cpu_capacity.cpu_millis() { return; } - let mut load_to_remove: CpuCapacity = total_load - indexer_cpu_capacity; + let mut load_to_remove: CpuCapacity = + CpuCapacity::from_cpu_millis(total_load) - indexer_cpu_capacity; let mut source_cpu_capacities: Vec<(CpuCapacity, SourceOrd)> = indexer_assignment .num_shards_per_source .iter() @@ -183,7 +186,31 @@ fn assert_enforce_nodes_cpu_capacity_post_condition( indexer_assignment: &IndexerAssignment, ) { let total_load = indexer_assignment.total_cpu_load(problem); - assert!(total_load <= problem.indexer_cpu_capacity(indexer_assignment.indexer_ord)); + assert!( + total_load + <= problem + .indexer_cpu_capacity(indexer_assignment.indexer_ord) + .cpu_millis() + ); +} + +fn attempt_place_unassigned_shards( + unassigned_shards: &[Source], + problem: &SchedulingProblem, + partial_solution: &SchedulingSolution, +) -> Result { + let mut solution = partial_solution.clone(); + let mut indexers_with_most_available_capacity: BinaryHeap<(CpuCapacity, IndexerOrd)> = + compute_indexer_available_capacity(problem, &solution); + for source in unassigned_shards { + place_unassigned_shards_single_source( + source, + &mut indexers_with_most_available_capacity, + &mut solution, + )?; + } + assert_place_unassigned_shards_post_condition(problem, &solution); + Ok(solution) } // ---------------------------------------------------- @@ -195,84 +222,85 @@ fn assert_enforce_nodes_cpu_capacity_post_condition( // // We then try to place as many shards as possible in the node with the // highest available capacity. - +// +// If this algorithm fails to place all remaining shards, we inflate the node capacities by 20% +// in the scheduling problem and start from the beginning. +#[must_use] fn place_unassigned_shards( - problem: &SchedulingProblem, - solution: &mut SchedulingSolution, -) -> BTreeMap { - let mut unassigned_shards: Vec = compute_unassigned_sources(problem, solution); + mut problem: SchedulingProblem, + partial_solution: SchedulingSolution, +) -> SchedulingSolution { + let mut unassigned_shards: Vec = + compute_unassigned_sources(&problem, &partial_solution); unassigned_shards.sort_by_key(|source| { let load = source.num_shards * source.load_per_shard.get(); Reverse(load) }); - let mut indexers_with_most_available_capacity: BinaryHeap<(CpuCapacity, IndexerOrd)> = - compute_indexer_available_capacity(problem, solution); - let mut unassignable_shards = BTreeMap::new(); - for source in unassigned_shards { - let num_shards_unassigned = place_unassigned_shards_single_source( - &source, - &mut indexers_with_most_available_capacity, - solution, - ); - // We haven't been able to place this source entirely. - if num_shards_unassigned != 0 { - unassignable_shards.insert(source.source_ord, num_shards_unassigned); + + // Thanks to the call to `inflate_node_capacities_if_necessary`, + // we are certain that even on our first attempt, the total capacity of the indexer exceeds 120% + // of the partial solution. + // + // 1.2^30 is about 240. + // If we reach 30 attempts we are certain to have a logical bug. + for attempt_number in 0..30 { + match attempt_place_unassigned_shards(&unassigned_shards[..], &problem, &partial_solution) { + Ok(solution) => { + if attempt_number != 0 { + warn!( + attempt_number = attempt_number, + "required to scale node capacity" + ); + } + return solution; + } + Err(NotEnoughCapacity) => { + problem.scale_node_capacities(1.2f32); + } } } - assert_place_unassigned_shards_post_condition(problem, solution, &unassignable_shards); - unassignable_shards + unreachable!("Failed to assign all of the sources"); } fn assert_place_unassigned_shards_post_condition( problem: &SchedulingProblem, solution: &SchedulingSolution, - unassigned_shards: &BTreeMap, ) { - // We make sure we all shard are cound as place or unassigned. + // We make sure we all shard are as placed. for source in problem.sources() { let num_assigned_shards: u32 = solution .indexer_assignments .iter() .map(|indexer_assignment| indexer_assignment.num_shards(source.source_ord)) .sum(); - assert_eq!( - num_assigned_shards - + unassigned_shards - .get(&source.source_ord) - .copied() - .unwrap_or(0), - source.num_shards - ); + assert_eq!(num_assigned_shards, source.num_shards); } - // We make sure that all unassigned shard cannot be placed. + // We make sure that the node capacity is respected. for indexer_assignment in &solution.indexer_assignments { - let available_capacity: CpuCapacity = - indexer_assignment.indexer_available_capacity(problem); - for (&source_ord, &num_shards) in unassigned_shards { - assert!(num_shards > 0); - let source = problem.source(source_ord); - assert!(source.load_per_shard.get() > available_capacity.cpu_millis()); - } + // We call this function just to check that the indexer assignment does not exceed this + // capacity. (it includes an assert that panics if it happens). + assert_enforce_nodes_cpu_capacity_post_condition(problem, indexer_assignment); } } +struct NotEnoughCapacity; + +/// Return Err(NotEnoughCapacity) iff the algorithm was unable to pack all of the sources +/// amongst the node with their given node capacity. fn place_unassigned_shards_single_source( source: &Source, indexer_available_capacities: &mut BinaryHeap<(CpuCapacity, IndexerOrd)>, solution: &mut SchedulingSolution, -) -> u32 { +) -> Result<(), NotEnoughCapacity> { let mut num_shards = source.num_shards; while num_shards > 0 { - let Some(mut node_with_most_capacity) = indexer_available_capacities.peek_mut() else { - break; - }; + let mut node_with_most_capacity = indexer_available_capacities.peek_mut().unwrap(); let node_id = node_with_most_capacity.1; let available_capacity = &mut node_with_most_capacity.0; let num_placable_shards = available_capacity.cpu_millis() / source.load_per_shard; let num_shards_to_place = num_placable_shards.min(num_shards); - // We cannot place more shards with this load. if num_shards_to_place == 0 { - break; + return Err(NotEnoughCapacity); } // TODO take in account colocation. // Update the solution, the shard load, and the number of shards to place. @@ -281,7 +309,7 @@ fn place_unassigned_shards_single_source( - CpuCapacity::from_cpu_millis(num_shards_to_place * source.load_per_shard.get()); num_shards -= num_shards_to_place; } - num_shards + Ok(()) } fn compute_unassigned_sources( @@ -309,6 +337,9 @@ fn compute_unassigned_sources( unassigned_sources.into_values().collect() } +/// Builds a BinaryHeap with the different indexer capacities. +/// +/// Panics if one of the indexer is over-assigned. fn compute_indexer_available_capacity( problem: &SchedulingProblem, solution: &SchedulingSolution, @@ -316,8 +347,12 @@ fn compute_indexer_available_capacity( let mut indexer_available_capacity: BinaryHeap<(CpuCapacity, IndexerOrd)> = BinaryHeap::with_capacity(problem.num_indexers()); for indexer_assignment in &solution.indexer_assignments { - let available_capacity = indexer_assignment.indexer_available_capacity(problem); - indexer_available_capacity.push((available_capacity, indexer_assignment.indexer_ord)); + let available_capacity: i32 = indexer_assignment.indexer_available_capacity(problem); + assert!(available_capacity >= 0i32); + indexer_available_capacity.push(( + CpuCapacity::from_cpu_millis(available_capacity as u32), + indexer_assignment.indexer_ord, + )); } indexer_available_capacity } @@ -437,8 +472,7 @@ mod tests { #[test] fn test_compute_unassigned_shards_simple() { - let mut problem = - SchedulingProblem::with_indexer_cpu_capacities(vec![mcpu(0), mcpu(4_000)]); + let mut problem = SchedulingProblem::with_indexer_cpu_capacities(vec![mcpu(4_000)]); problem.add_source(4, NonZeroU32::new(1000).unwrap()); problem.add_source(4, NonZeroU32::new(1_000).unwrap()); let solution = problem.new_solution(); @@ -488,10 +522,9 @@ mod tests { fn test_place_unassigned_shards_simple() { let mut problem = SchedulingProblem::with_indexer_cpu_capacities(vec![mcpu(4_000)]); problem.add_source(4, NonZeroU32::new(1_000).unwrap()); - let mut solution = problem.new_solution(); - let unassigned = place_unassigned_shards(&problem, &mut solution); + let partial_solution = problem.new_solution(); + let solution = place_unassigned_shards(problem, partial_solution); assert_eq!(solution.indexer_assignments[0].num_shards(0), 4); - assert!(unassigned.is_empty()); } #[test] @@ -534,13 +567,13 @@ mod tests { problem.add_source(43, NonZeroU32::new(1).unwrap()); problem.add_source(379, NonZeroU32::new(1).unwrap()); let previous_solution = problem.new_solution(); - solve(&problem, previous_solution); + solve(problem, previous_solution); } fn indexer_cpu_capacity_strat() -> impl Strategy { prop_oneof![ - 0u32..10_000u32, - Just(0u32), + 1u32..10_000u32, + Just(1u32), 800u32..1200u32, 1900u32..2100u32, ] @@ -584,7 +617,7 @@ mod tests { fn num_nodes_strat() -> impl Strategy { prop_oneof![ - 3 => 0usize..3, + 3 => 1usize..3, 1 => 4usize..10, ] } @@ -628,25 +661,20 @@ mod tests { } #[test] - fn test_problem_leading_to_zero_shard() { - let mut problem = SchedulingProblem::with_indexer_cpu_capacities(vec![ - CpuCapacity::from_cpu_millis(0), - CpuCapacity::from_cpu_millis(0), - ]); - problem.add_source(0, NonZeroU32::new(1).unwrap()); + fn test_problem_missing_capacities() { + let mut problem = + SchedulingProblem::with_indexer_cpu_capacities(vec![CpuCapacity::from_cpu_millis(100)]); + problem.add_source(1, NonZeroU32::new(1).unwrap()); let mut previous_solution = problem.new_solution(); previous_solution.indexer_assignments[0].add_shards(0, 0); - previous_solution.indexer_assignments[1].add_shards(0, 0); - let (solution, still_unassigned) = solve(&problem, previous_solution); - assert_eq!(solution.indexer_assignments[0].num_shards(0), 0); - assert_eq!(solution.indexer_assignments[1].num_shards(0), 0); - assert!(still_unassigned.is_empty()); + let solution = solve(problem, previous_solution); + assert_eq!(solution.indexer_assignments[0].num_shards(0), 1); } proptest! { #[test] fn test_proptest_post_conditions((problem, solution) in problem_solution_strategy()) { - solve(&problem, solution); + solve(problem, solution); } } } diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs index be16c522c21..e503edaa45e 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs @@ -40,6 +40,23 @@ pub struct SchedulingProblem { } impl SchedulingProblem { + /// Problem constructor. + /// + /// Panics if the list of indexers is empty or if one of the + /// indexer has a null capacity. + pub fn with_indexer_cpu_capacities( + indexer_cpu_capacities: Vec, + ) -> SchedulingProblem { + assert!(!indexer_cpu_capacities.is_empty()); + assert!(indexer_cpu_capacities + .iter() + .all(|cpu_capacity| cpu_capacity.cpu_millis() > 0)); + SchedulingProblem { + sources: Vec::new(), + indexer_cpu_capacities, + } + } + pub fn new_solution(&self) -> SchedulingSolution { SchedulingSolution::with_num_indexers(self.indexer_cpu_capacities.len()) } @@ -48,21 +65,32 @@ impl SchedulingProblem { self.indexer_cpu_capacities[indexer_ord] } - pub fn with_indexer_cpu_capacities( - indexer_cpu_capacities: Vec, - ) -> SchedulingProblem { - SchedulingProblem { - sources: Vec::new(), - indexer_cpu_capacities, + /// Scales the cpu capacity by the given scaling factor. + /// + /// Resulting cpu capacity are ceiled to the next integer millicpus value. + pub fn scale_node_capacities(&mut self, scale: f32) { + for capacity in &mut self.indexer_cpu_capacities { + let scaled_cpu_millis = (capacity.cpu_millis() as f32 * scale).ceil() as u32; + *capacity = CpuCapacity::from_cpu_millis(scaled_cpu_millis); } } - pub fn sources(&self) -> impl Iterator + '_ { - self.sources.iter().copied() + pub fn total_node_capacities(&self) -> CpuCapacity { + self.indexer_cpu_capacities + .iter() + .copied() + .fold(CpuCapacity::zero(), |left, right| left + right) + } + + pub fn total_load(&self) -> u32 { + self.sources + .iter() + .map(|source| source.num_shards * source.load_per_shard.get()) + .sum() } - pub fn source(&self, source_ord: SourceOrd) -> Source { - self.sources[source_ord as usize] + pub fn sources(&self) -> impl Iterator + '_ { + self.sources.iter().copied() } pub fn add_source(&mut self, num_shards: u32, load_per_shard: NonZeroU32) -> SourceOrd { @@ -88,7 +116,7 @@ impl SchedulingProblem { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Eq, PartialEq)] pub struct IndexerAssignment { pub indexer_ord: IndexerOrd, pub num_shards_per_source: BTreeMap, @@ -102,25 +130,21 @@ impl IndexerAssignment { } } - pub fn indexer_available_capacity(&self, problem: &SchedulingProblem) -> CpuCapacity { - let total_cpu_capacity = self.total_cpu_load(problem); + /// Returns the number of available `mcpu` in the indexer. + /// If the indexer is over-assigned this method returns a negative value. + pub fn indexer_available_capacity(&self, problem: &SchedulingProblem) -> i32 { + let total_cpu_load = self.total_cpu_load(problem); let indexer_cpu_capacity = problem.indexer_cpu_capacities[self.indexer_ord]; - if indexer_cpu_capacity <= total_cpu_capacity { - CpuCapacity::zero() - } else { - indexer_cpu_capacity - total_cpu_capacity - } + indexer_cpu_capacity.cpu_millis() as i32 - total_cpu_load as i32 } - pub fn total_cpu_load(&self, problem: &SchedulingProblem) -> CpuCapacity { - CpuCapacity::from_cpu_millis( - self.num_shards_per_source - .iter() - .map(|(source_ord, num_shards)| { - problem.source_load_per_shard(*source_ord).get() * num_shards - }) - .sum(), - ) + pub fn total_cpu_load(&self, problem: &SchedulingProblem) -> u32 { + self.num_shards_per_source + .iter() + .map(|(source_ord, num_shards)| { + problem.source_load_per_shard(*source_ord).get() * num_shards + }) + .sum() } pub fn num_shards(&self, source_ord: SourceOrd) -> u32 { @@ -150,7 +174,7 @@ impl IndexerAssignment { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Eq, PartialEq)] pub struct SchedulingSolution { pub indexer_assignments: Vec, }