Skip to content

Commit

Permalink
Assign all sources/shards, even if this require exceeding the indexer (
Browse files Browse the repository at this point in the history
…#4363)

* Assign all sources/shards, even if this require exceeding the indexer
capacities.

We do that simply by scaling the indexer capacities.
First we scale the indexer capacities so that the sum of their
capacities exceeds the total load of the source to assign.

If the packing algorithm fails, we inflates the capacities again
by 10% and retry.

Added assert on convert solution to physical solution.

Closes #4290

* bugfix

* Removing implicit contract following CR comment
  • Loading branch information
fulmicoton authored Jan 15, 2024
1 parent a853103 commit 2c21350
Show file tree
Hide file tree
Showing 4 changed files with 298 additions and 171 deletions.
54 changes: 16 additions & 38 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,21 +196,29 @@ impl IndexingScheduler {
// has happened.
pub(crate) fn schedule_indexing_plan_if_needed(&mut self, model: &ControlPlaneModel) {
crate::metrics::CONTROL_PLANE_METRICS.schedule_total.inc();
let mut indexers: Vec<(String, IndexerNodeInfo)> = self.get_indexers_from_indexer_pool();
if indexers.is_empty() {
warn!("no indexer available, cannot schedule an indexing plan");
return;
};

let sources = get_sources_to_schedule(model);

let mut indexers: Vec<(String, IndexerNodeInfo)> = self.get_indexers_from_indexer_pool();

let indexer_id_to_cpu_capacities: FnvHashMap<String, CpuCapacity> = indexers
.iter()
.map(|(indexer_id, indexer_node_info)| {
(indexer_id.to_string(), indexer_node_info.indexing_capacity)
.filter_map(|(indexer_id, indexer_node_info)| {
if indexer_node_info.indexing_capacity.cpu_millis() > 0 {
Some((indexer_id.to_string(), indexer_node_info.indexing_capacity))
} else {
None
}
})
.collect();

if indexer_id_to_cpu_capacities.is_empty() {
if !sources.is_empty() {
warn!("no indexing capacity available, cannot schedule an indexing plan");
}
return;
};

let new_physical_plan = build_physical_indexing_plan(
&sources,
&indexer_id_to_cpu_capacities,
Expand Down Expand Up @@ -808,37 +816,7 @@ mod tests {
let indexer_id = format!("indexer-{i}");
indexer_max_loads.insert(indexer_id, mcpu(4_000));
}
let physical_indexing_plan = build_physical_indexing_plan(&sources, &indexer_max_loads, None);
let source_map: FnvHashMap<&SourceUid, &SourceToSchedule> = sources
.iter()
.map(|source| (&source.source_uid, source))
.collect();
for (node_id, tasks) in physical_indexing_plan.indexing_tasks_per_indexer() {
let mut load_in_node = 0u32;
for task in tasks {
let source_uid = SourceUid {
index_uid: IndexUid::from(task.index_uid.clone()),
source_id: task.source_id.clone(),
};
let source_to_schedule = source_map.get(&source_uid).unwrap();
match &source_to_schedule.source_type {
SourceToScheduleType::IngestV1 => {}
SourceToScheduleType::Sharded {
load_per_shard,
..
} => {
load_in_node += load_per_shard.get() * task.shard_ids.len() as u32;
}
SourceToScheduleType::NonSharded {
num_pipelines: _ ,
load_per_pipeline,
} => {
load_in_node += load_per_pipeline.get();
}
}
}
assert!(load_in_node <= indexer_max_loads.get(node_id).unwrap().cpu_millis());
}
let _physical_indexing_plan = build_physical_indexing_plan(&sources, &indexer_max_loads, None);
}
}

Expand Down
147 changes: 122 additions & 25 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,26 +77,34 @@ fn populate_problem(
}

#[derive(Default)]
struct IdToOrdMap {
struct IdToOrdMap<'a> {
indexer_ids: Vec<String>,
source_uids: Vec<SourceUid>,
sources: Vec<&'a SourceToSchedule>,
indexer_id_to_indexer_ord: FnvHashMap<String, IndexerOrd>,
source_uid_to_source_ord: FnvHashMap<SourceUid, SourceOrd>,
}

impl IdToOrdMap {
fn add_source_uid(&mut self, source_uid: SourceUid) -> SourceOrd {
impl<'a> IdToOrdMap<'a> {
// All source added are required to have a different source uid.
fn add_source(&mut self, source: &'a SourceToSchedule) -> SourceOrd {
let source_ord = self.source_uid_to_source_ord.len() as SourceOrd;
self.source_uid_to_source_ord
.insert(source_uid.clone(), source_ord);
self.source_uids.push(source_uid);
let previous_item = self
.source_uid_to_source_ord
.insert(source.source_uid.clone(), source_ord);
assert!(previous_item.is_none());
self.sources.push(&source);
source_ord
}

fn source_ord(&self, source_uid: &SourceUid) -> Option<SourceOrd> {
self.source_uid_to_source_ord.get(source_uid).copied()
}

fn source(&self, source_uid: &SourceUid) -> Option<(SourceOrd, &'a SourceToSchedule)> {
let source_ord = self.source_uid_to_source_ord.get(source_uid).copied()?;
Some((source_ord, self.sources[source_ord as usize]))
}

fn indexer_ord(&self, indexer_id: &str) -> Option<IndexerOrd> {
self.indexer_id_to_indexer_ord.get(indexer_id).copied()
}
Expand All @@ -123,8 +131,21 @@ fn convert_physical_plan_to_solution(
index_uid: IndexUid::from(indexing_task.index_uid.clone()),
source_id: indexing_task.source_id.clone(),
};
if let Some(source_ord) = id_to_ord_map.source_ord(&source_uid) {
indexer_assignment.add_shards(source_ord, indexing_task.shard_ids.len() as u32);
if let Some((source_ord, source)) = id_to_ord_map.source(&source_uid) {
match &source.source_type {
SourceToScheduleType::Sharded { .. } => {
indexer_assignment
.add_shards(source_ord, indexing_task.shard_ids.len() as u32);
}
SourceToScheduleType::NonSharded { .. } => {
// For non-sharded sources like Kafka, one pipeline = one shard in the
// solutions
indexer_assignment.add_shards(source_ord, 1);
}
SourceToScheduleType::IngestV1 => {
// Ingest V1 is not part of the logical placement algorithm.
}
}
}
}
}
Expand Down Expand Up @@ -310,16 +331,17 @@ fn pick_indexer(capacity_per_node: &[(String, u32)]) -> impl Iterator<Item = &st
/// We do not support moving shard from one pipeline to another, so if required this function may
/// also return instruction about deleting / adding new shards.
fn convert_scheduling_solution_to_physical_plan(
mut solution: SchedulingSolution,
solution: &SchedulingSolution,
id_to_ord_map: &IdToOrdMap,
sources: &[SourceToSchedule],
previous_plan_opt: Option<&PhysicalIndexingPlan>,
) -> PhysicalIndexingPlan {
let mut indexer_assignments = solution.indexer_assignments.clone();
let mut new_physical_plan = PhysicalIndexingPlan::with_indexer_ids(&id_to_ord_map.indexer_ids);
for (indexer_id, indexer_assignment) in id_to_ord_map
.indexer_ids
.iter()
.zip(&mut solution.indexer_assignments)
.zip(&mut indexer_assignments)
{
let previous_tasks_for_indexer = previous_plan_opt
.and_then(|previous_plan| previous_plan.indexer(indexer_id))
Expand Down Expand Up @@ -349,7 +371,7 @@ fn convert_scheduling_solution_to_physical_plan(
for (indexer, indexing_tasks) in new_physical_plan.indexing_tasks_per_indexer_mut() {
let indexer_ord = id_to_ord_map.indexer_ord(indexer).unwrap();
let mut num_shards_for_indexer_source: u32 =
solution.indexer_assignments[indexer_ord].num_shards(source_ord);
indexer_assignments[indexer_ord].num_shards(source_ord);
for indexing_task in indexing_tasks {
if indexing_task.index_uid == source.source_uid.index_uid.as_str()
&& indexing_task.source_id == source.source_uid.source_id
Expand Down Expand Up @@ -400,6 +422,20 @@ fn convert_scheduling_solution_to_physical_plan(
new_physical_plan
}

// Checks that's the physical solution indeed matches the scheduling solution.
fn assert_post_condition_physical_plan_match_solution(
physical_plan: &PhysicalIndexingPlan,
solution: &SchedulingSolution,
id_to_ord_map: &IdToOrdMap,
) {
let num_indexers = physical_plan.indexing_tasks_per_indexer().len();
assert_eq!(num_indexers, solution.indexer_assignments.len());
assert_eq!(num_indexers, id_to_ord_map.indexer_ids.len());
let mut reconstructed_solution = SchedulingSolution::with_num_indexers(num_indexers);
convert_physical_plan_to_solution(physical_plan, id_to_ord_map, &mut reconstructed_solution);
assert_eq!(solution, &reconstructed_solution);
}

fn add_shard_to_indexer(
missing_shard: ShardId,
indexer: &str,
Expand Down Expand Up @@ -435,6 +471,35 @@ fn add_shard_to_indexer(
}
}

// If the total node capacities is lower than 110% of the problem load, this
// function scales the load of the indexer to reach this limit.
fn inflate_node_capacities_if_necessary(problem: &mut SchedulingProblem) {
// First we scale the problem to the point where any indexer can fit the largest shard.
let Some(largest_shard_load) = problem.sources().map(|source| source.load_per_shard).max()
else {
return;
};
let min_indexer_capacity = (0..problem.num_indexers())
.map(|indexer_ord| problem.indexer_cpu_capacity(indexer_ord))
.min()
.expect("At least one indexer is required");
assert_ne!(min_indexer_capacity.cpu_millis(), 0);
if min_indexer_capacity.cpu_millis() < largest_shard_load.get() {
let scaling_factor =
(largest_shard_load.get() as f32) / (min_indexer_capacity.cpu_millis() as f32);
problem.scale_node_capacities(scaling_factor);
}

let total_node_capacities: f32 = problem.total_node_capacities().cpu_millis() as f32;
let total_load: f32 = problem.total_load() as f32;
let inflated_total_load = total_load * 1.2f32;
if inflated_total_load >= total_node_capacities {
// We need to inflate our node capacities to match the problem.
let ratio = inflated_total_load / total_node_capacities;
problem.scale_node_capacities(ratio);
}
}

/// Creates a physical plan given the current situation of the cluster and the list of sources
/// to schedule.
///
Expand Down Expand Up @@ -477,9 +542,10 @@ pub fn build_physical_indexing_plan(
}

let mut problem = SchedulingProblem::with_indexer_cpu_capacities(indexer_cpu_capacities);

for source in sources {
if let Some(source_ord) = populate_problem(source, &mut problem) {
let registered_source_ord = id_to_ord_map.add_source_uid(source.source_uid.clone());
let registered_source_ord = id_to_ord_map.add_source(source);
assert_eq!(source_ord, registered_source_ord);
}
}
Expand All @@ -491,22 +557,23 @@ pub fn build_physical_indexing_plan(
}

// Compute the new scheduling solution
let (new_solution, unassigned_shards) = scheduling_logic::solve(&problem, previous_solution);

if !unassigned_shards.is_empty() {
// TODO this is probably a bad idea to just not overschedule, as having a single index trail
// behind will prevent the log GC.
// A better strategy would probably be to close shard, and start prevent ingestion.
error!("unable to assign all sources in the cluster");
}
let new_solution = scheduling_logic::solve(problem, previous_solution);

// Convert the new scheduling solution back to a physical plan.
convert_scheduling_solution_to_physical_plan(
new_solution,
let new_physical_plan = convert_scheduling_solution_to_physical_plan(
&new_solution,
&id_to_ord_map,
sources,
previous_plan_opt,
)
);

assert_post_condition_physical_plan_match_solution(
&new_physical_plan,
&new_solution,
&id_to_ord_map,
);

new_physical_plan
}

#[cfg(test)]
Expand Down Expand Up @@ -609,7 +676,7 @@ mod tests {
let physical_plan = build_physical_indexing_plan(&sources, &indexer_max_loads, None);
assert_eq!(physical_plan.indexing_tasks_per_indexer().len(), 1);
let expected_tasks = physical_plan.indexer(&indexer1).unwrap();
assert_eq!(expected_tasks.len(), 1);
assert_eq!(expected_tasks.len(), 2);
assert_eq!(&expected_tasks[0].source_id, &source_uid1.source_id);
}
{
Expand Down Expand Up @@ -808,4 +875,34 @@ mod tests {
let indexers: Vec<&str> = super::pick_indexer(&indexer_capacity).collect();
assert_eq!(indexers, &["node1", "node3", "node3", "node4", "node4"]);
}

#[test]
fn test_solution_reconstruction() {
let sources_to_schedule = vec![
SourceToSchedule {
source_uid: SourceUid {
index_uid: IndexUid::parse("otel-logs-v0_6:01HKYD1SE37C90KSH21CD1M11A")
.unwrap(),
source_id: "_ingest-api-source".to_string(),
},
source_type: SourceToScheduleType::IngestV1,
},
SourceToSchedule {
source_uid: SourceUid {
index_uid: IndexUid::parse(
"simian_chico_12856033706389338959:01HKYD414H1WVSASC5YD972P39",
)
.unwrap(),
source_id: "_ingest-source".to_string(),
},
source_type: SourceToScheduleType::Sharded {
shard_ids: vec![1],
load_per_shard: NonZeroU32::new(250).unwrap(),
},
},
];
let mut capacities = FnvHashMap::default();
capacities.insert("indexer-1".to_string(), CpuCapacity::from_cpu_millis(8000));
build_physical_indexing_plan(&sources_to_schedule, &capacities, None);
}
}
Loading

0 comments on commit 2c21350

Please sign in to comment.