diff --git a/Cargo.toml b/Cargo.toml index 6cd3de839d416..cab518879c362 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -204,6 +204,7 @@ sourcemap = "9.0.0" strsim = "0.11.1" syn = "1.0.107" tempfile = "3.3.0" +thread_local = "1.1.8" thiserror = "1.0.48" tiny-gradient = "0.1.0" tokio = "1.25.0" diff --git a/turbopack/crates/turbo-tasks-backend/Cargo.toml b/turbopack/crates/turbo-tasks-backend/Cargo.toml index a3291740669c1..0dba282710123 100644 --- a/turbopack/crates/turbo-tasks-backend/Cargo.toml +++ b/turbopack/crates/turbo-tasks-backend/Cargo.toml @@ -15,6 +15,7 @@ workspace = true [features] default = [] verify_serialization = [] +trace_aggregation_update = [] [dependencies] anyhow = { workspace = true } @@ -39,7 +40,7 @@ smallvec = { workspace = true } tokio = { workspace = true } tokio-scoped = "0.2.0" tracing = { workspace = true } -thread_local = { version = "1.1.8" } +thread_local = { workspace = true } turbo-prehash = { workspace = true } turbo-tasks = { workspace = true } turbo-tasks-hash = { workspace = true } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs index b01d8bf4f4b85..d4f84994e370a 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs @@ -1,6 +1,7 @@ use std::{ - cmp::{max, Ordering}, + cmp::{max, Ordering, Reverse}, collections::{hash_map::Entry as HashMapEntry, VecDeque}, + hash::Hash, num::NonZeroU32, }; @@ -8,7 +9,9 @@ use indexmap::map::Entry; use rustc_hash::{FxHashMap, FxHashSet}; use serde::{Deserialize, Serialize}; use smallvec::SmallVec; -use turbo_tasks::{FxIndexMap, FxIndexSet, SessionId, TaskId, TraitTypeId}; +#[cfg(feature = "trace_aggregation_update")] +use tracing::{span::Span, trace_span}; +use turbo_tasks::{FxIndexMap, SessionId, TaskId, TraitTypeId}; use crate::{ backend::{ @@ -16,13 +19,14 @@ use crate::{ invalidate::{make_task_dirty, TaskDirtyCause}, ExecuteContext, Operation, TaskGuard, }, - storage::{get, get_many, iter_many, remove, update, update_count}, + storage::{get, get_many, iter_many, remove, update, update_count, update_ucount_and_get}, TaskDataCategory, }, data::{ ActiveType, AggregationNumber, CachedDataItem, CachedDataItemKey, CollectibleRef, DirtyContainerCount, RootState, }, + utils::deque_set::DequeSet, }; pub const LEAF_NUMBER: u32 = 16; @@ -329,20 +333,123 @@ impl AggregatedDataUpdate { } } -#[derive(Serialize, Deserialize, Clone, Copy)] +#[derive(Serialize, Deserialize, Clone)] struct AggregationNumberUpdate { base_aggregation_number: u32, distance: Option, + #[cfg(feature = "trace_aggregation_update")] + #[serde(skip, default)] + span: Option, } +#[derive(Serialize, Deserialize, Clone)] +struct AggregationUpdateJobItem { + job: AggregationUpdateJob, + #[cfg(feature = "trace_aggregation_update")] + #[serde(skip, default)] + span: Option, +} + +impl AggregationUpdateJobItem { + fn new(job: AggregationUpdateJob) -> Self { + Self { + job, + #[cfg(feature = "trace_aggregation_update")] + span: Some(Span::current()), + } + } + + fn entered(self) -> AggregationUpdatejobGuard { + AggregationUpdatejobGuard { + job: self.job, + #[cfg(feature = "trace_aggregation_update")] + _guard: self.span.map(|s| s.entered()), + } + } +} + +struct AggregationUpdatejobGuard { + job: AggregationUpdateJob, + #[cfg(feature = "trace_aggregation_update")] + _guard: Option, +} + +#[derive(Serialize, Deserialize, Clone)] +struct BalanceJob { + upper_id: TaskId, + task_id: TaskId, + #[cfg(feature = "trace_aggregation_update")] + #[serde(skip, default)] + span: Option, +} + +impl BalanceJob { + fn new(upper: TaskId, task: TaskId) -> Self { + Self { + upper_id: upper, + task_id: task, + #[cfg(feature = "trace_aggregation_update")] + span: Some(Span::current()), + } + } +} + +impl Hash for BalanceJob { + fn hash(&self, state: &mut H) { + self.upper_id.hash(state); + self.task_id.hash(state); + } +} + +impl PartialEq for BalanceJob { + fn eq(&self, other: &Self) -> bool { + self.upper_id == other.upper_id && self.task_id == other.task_id + } +} + +impl Eq for BalanceJob {} + +#[derive(Serialize, Deserialize, Clone)] +struct OptimizeJob { + task_id: TaskId, + #[cfg(feature = "trace_aggregation_update")] + #[serde(skip, default)] + span: Option, +} + +impl OptimizeJob { + fn new(task: TaskId) -> Self { + Self { + task_id: task, + #[cfg(feature = "trace_aggregation_update")] + span: Some(Span::current()), + } + } +} + +impl Hash for OptimizeJob { + fn hash(&self, state: &mut H) { + self.task_id.hash(state); + } +} + +impl PartialEq for OptimizeJob { + fn eq(&self, other: &Self) -> bool { + self.task_id == other.task_id + } +} + +impl Eq for OptimizeJob {} + #[derive(Default, Serialize, Deserialize, Clone)] pub struct AggregationUpdateQueue { - jobs: VecDeque, + jobs: VecDeque, number_updates: FxIndexMap, done_number_updates: FxHashMap, - find_and_schedule: FxIndexSet, + find_and_schedule: DequeSet, done_find_and_schedule: FxHashSet, - balance_queue: FxIndexSet<(TaskId, TaskId)>, + balance_queue: DequeSet, + optimize_queue: DequeSet, } impl AggregationUpdateQueue { @@ -351,9 +458,10 @@ impl AggregationUpdateQueue { jobs: VecDeque::with_capacity(8), number_updates: FxIndexMap::default(), done_number_updates: FxHashMap::default(), - find_and_schedule: FxIndexSet::default(), + find_and_schedule: DequeSet::default(), done_find_and_schedule: FxHashSet::default(), - balance_queue: FxIndexSet::default(), + balance_queue: DequeSet::default(), + optimize_queue: DequeSet::default(), } } @@ -403,15 +511,18 @@ impl AggregationUpdateQueue { entry.insert(AggregationNumberUpdate { base_aggregation_number, distance, + #[cfg(feature = "trace_aggregation_update")] + span: Some(Span::current()), }); } }; } AggregationUpdateJob::BalanceEdge { upper_id, task_id } => { - self.balance_queue.insert((upper_id, task_id)); + self.balance_queue + .insert_back(BalanceJob::new(upper_id, task_id)); } _ => { - self.jobs.push_back(job); + self.jobs.push_back(AggregationUpdateJobItem::new(job)); } } } @@ -424,7 +535,7 @@ impl AggregationUpdateQueue { pub fn push_find_and_schedule_dirty(&mut self, task_id: TaskId) { if !self.done_find_and_schedule.contains(&task_id) { - self.find_and_schedule.insert(task_id); + self.find_and_schedule.insert_back(task_id); } } @@ -436,6 +547,10 @@ impl AggregationUpdateQueue { ); } + pub fn push_optimize_task(&mut self, task_id: TaskId) { + self.optimize_queue.insert_back(OptimizeJob::new(task_id)); + } + pub fn run(job: AggregationUpdateJob, ctx: &mut impl ExecuteContext) { let mut queue = Self::new(); queue.push(job); @@ -444,7 +559,8 @@ impl AggregationUpdateQueue { pub fn process(&mut self, ctx: &mut impl ExecuteContext) -> bool { if let Some(job) = self.jobs.pop_front() { - match job { + let job = job.entered(); + match job.job { AggregationUpdateJob::UpdateAggregationNumber { .. } | AggregationUpdateJob::BalanceEdge { .. } => { // These jobs are never pushed to the queue @@ -465,12 +581,12 @@ impl AggregationUpdateQueue { } else if uppers > followers { if let Some(new_follower_id) = new_follower_ids.pop() { if !new_follower_ids.is_empty() { - self.jobs.push_front( + self.jobs.push_front(AggregationUpdateJobItem::new( AggregationUpdateJob::InnerOfUppersHasNewFollowers { upper_ids: upper_ids.clone(), new_follower_ids, }, - ); + )); } self.inner_of_uppers_has_new_follower(ctx, new_follower_id, upper_ids); } @@ -478,12 +594,12 @@ impl AggregationUpdateQueue { #[allow(clippy::collapsible_if, reason = "readablility")] if let Some(upper_id) = upper_ids.pop() { if !upper_ids.is_empty() { - self.jobs.push_front( + self.jobs.push_front(AggregationUpdateJobItem::new( AggregationUpdateJob::InnerOfUppersHasNewFollowers { upper_ids, new_follower_ids: new_follower_ids.clone(), }, - ); + )); } self.inner_of_upper_has_new_followers(ctx, new_follower_ids, upper_id); } @@ -522,12 +638,12 @@ impl AggregationUpdateQueue { if upper_ids.len() > lost_follower_ids.len() { if let Some(lost_follower_id) = lost_follower_ids.pop() { if !lost_follower_ids.is_empty() { - self.jobs.push_front( + self.jobs.push_front(AggregationUpdateJobItem::new( AggregationUpdateJob::InnerOfUppersLostFollowers { upper_ids: upper_ids.clone(), lost_follower_ids, }, - ); + )); } self.inner_of_uppers_lost_follower(ctx, lost_follower_id, upper_ids); } @@ -535,12 +651,12 @@ impl AggregationUpdateQueue { #[allow(clippy::collapsible_if, reason = "readablility")] if let Some(upper_id) = upper_ids.pop() { if !upper_ids.is_empty() { - self.jobs.push_front( + self.jobs.push_front(AggregationUpdateJobItem::new( AggregationUpdateJob::InnerOfUppersLostFollowers { upper_ids, lost_follower_ids: lost_follower_ids.clone(), }, - ); + )); } self.inner_of_upper_lost_followers(ctx, lost_follower_ids, upper_id); } @@ -579,14 +695,28 @@ impl AggregationUpdateQueue { } else if !self.number_updates.is_empty() { let mut remaining = MAX_COUNT_BEFORE_YIELD; while remaining > 0 { - if let Some((task_id, update)) = self.number_updates.pop() { - self.done_number_updates.insert(task_id, update); - self.update_aggregation_number( - ctx, + if let Some(( + task_id, + AggregationNumberUpdate { + base_aggregation_number, + distance, + #[cfg(feature = "trace_aggregation_update")] + span, + }, + )) = self.number_updates.pop() + { + #[cfg(feature = "trace_aggregation_update")] + let _guard = span.map(|s| s.entered()); + self.done_number_updates.insert( task_id, - update.distance, - update.base_aggregation_number, + AggregationNumberUpdate { + base_aggregation_number, + distance, + #[cfg(feature = "trace_aggregation_update")] + span: None, + }, ); + self.update_aggregation_number(ctx, task_id, distance, base_aggregation_number); remaining -= 1; } else { break; @@ -596,18 +726,39 @@ impl AggregationUpdateQueue { } else if !self.balance_queue.is_empty() { let mut remaining = MAX_COUNT_BEFORE_YIELD; while remaining > 0 { - if let Some((upper_id, task_id)) = self.balance_queue.pop() { - self.balance_edge(ctx, upper_id, task_id); + if let Some(BalanceJob { + upper_id: upper, + task_id: task, + #[cfg(feature = "trace_aggregation_update")] + span, + }) = self.balance_queue.pop_front() + { + #[cfg(feature = "trace_aggregation_update")] + let _guard = span.map(|s| s.entered()); + self.balance_edge(ctx, upper, task); remaining -= 1; } else { break; } } false + } else if let Some(OptimizeJob { + task_id, + #[cfg(feature = "trace_aggregation_update")] + span, + }) = self.optimize_queue.pop_front() + { + // Note: We must process one optimization completely before starting with the next one. + // Otherwise this could lead to optimizing every node of a subgraph of inner nodes, as + // all have the same upper count. Optimizing the root first + #[cfg(feature = "trace_aggregation_update")] + let _guard = span.map(|s| s.entered()); + self.optimize_task(ctx, task_id); + false } else if !self.find_and_schedule.is_empty() { let mut remaining = MAX_COUNT_BEFORE_YIELD; while remaining > 0 { - if let Some(task_id) = self.find_and_schedule.pop() { + if let Some(task_id) = self.find_and_schedule.pop_front() { self.find_and_schedule_dirty(task_id, ctx); remaining -= 1; } else { @@ -621,6 +772,9 @@ impl AggregationUpdateQueue { } fn balance_edge(&mut self, ctx: &mut impl ExecuteContext, upper_id: TaskId, task_id: TaskId) { + #[cfg(feature = "trace_aggregation_update")] + let _span = trace_span!("process balance edge").entered(); + let (mut upper, mut task) = ctx.task_pair(upper_id, task_id, TaskDataCategory::Meta); let upper_aggregation_number = get_aggregation_number(&upper); let task_aggregation_number = get_aggregation_number(&task); @@ -638,10 +792,21 @@ impl AggregationUpdateQueue { value: count, }), std::cmp::Ordering::Greater => { + #[cfg(feature = "trace_aggregation_update")] + let _span = trace_span!("make inner").entered(); + let upper_ids = get_uppers(&upper); // Add the same amount of upper edges if update_count!(task, Upper { task: upper_id }, count) { + if !upper_id.is_transient() { + #[allow(clippy::collapsible_if, reason = "readablility")] + if update_ucount_and_get!(task, PersistentUpperCount, 1) + .is_power_of_two() + { + self.push_optimize_task(task_id); + } + } // When this is a new inner node, update aggregated data and // followers let data = AggregatedDataUpdate::from_task(&mut task); @@ -688,6 +853,13 @@ impl AggregationUpdateQueue { value: count, }), Ordering::Greater => { + #[cfg(feature = "trace_aggregation_update")] + let _span = trace_span!("make follower").entered(); + + if !upper_id.is_transient() { + update_ucount_and_get!(task, PersistentUpperCount, -1); + } + let upper_ids: Vec<_> = get_uppers(&upper); // Add the same amount of follower edges @@ -722,25 +894,17 @@ impl AggregationUpdateQueue { Ordering::Equal => {} } } else { + #[cfg(feature = "trace_aggregation_update")] + let _span = trace_span!("conflict").entered(); + // both nodes have the same aggregation number - // We need to change the aggregation number of the task or of upper - let upper_uppers = iter_uppers(&upper).count(); - let task_uppers = iter_uppers(&task).count(); - if upper_uppers > task_uppers { - let current = get!(upper, AggregationNumber).copied().unwrap_or_default(); - self.push(AggregationUpdateJob::UpdateAggregationNumber { - task_id: upper_id, - base_aggregation_number: current.base + 1, - distance: None, - }); - } else { - let current = get!(task, AggregationNumber).copied().unwrap_or_default(); - self.push(AggregationUpdateJob::UpdateAggregationNumber { - task_id, - base_aggregation_number: current.base + 1, - distance: None, - }); - } + // We need to change the aggregation number of the task + let current = get!(task, AggregationNumber).copied().unwrap_or_default(); + self.push(AggregationUpdateJob::UpdateAggregationNumber { + task_id, + base_aggregation_number: current.base + 1, + distance: None, + }); } } @@ -796,8 +960,12 @@ impl AggregationUpdateQueue { lost_follower_id: TaskId, mut upper_ids: Vec, ) { + #[cfg(feature = "trace_aggregation_update")] + let _span = trace_span!("lost follower (n uppers)", uppers = upper_ids.len()).entered(); + let mut follower = ctx.task(lost_follower_id, TaskDataCategory::Meta); let mut follower_in_upper_ids = Vec::new(); + let mut persistent_uppers = 0; upper_ids.retain(|&upper_id| { let mut keep_upper = false; update!(follower, Upper { task: upper_id }, |old| { @@ -811,6 +979,9 @@ impl AggregationUpdateQueue { } if old == 1 { keep_upper = true; + if !upper_id.is_transient() { + persistent_uppers += 1; + } return None; } Some(old - 1) @@ -818,6 +989,8 @@ impl AggregationUpdateQueue { keep_upper }); if !upper_ids.is_empty() { + update_ucount_and_get!(follower, PersistentUpperCount, -persistent_uppers); + let data = AggregatedDataUpdate::from_task(&mut follower).invert(); let followers: Vec<_> = get_followers(&follower); drop(follower); @@ -870,6 +1043,13 @@ impl AggregationUpdateQueue { mut lost_follower_ids: Vec, upper_id: TaskId, ) { + #[cfg(feature = "trace_aggregation_update")] + let _span = trace_span!( + "lost follower (n follower)", + followers = lost_follower_ids.len() + ) + .entered(); + lost_follower_ids.retain(|lost_follower_id| { let mut follower = ctx.task(*lost_follower_id, TaskDataCategory::Meta); let mut remove_upper = false; @@ -890,6 +1070,10 @@ impl AggregationUpdateQueue { Some(old - 1) }); if remove_upper { + if !upper_id.is_transient() { + update_ucount_and_get!(follower, PersistentUpperCount, -1); + } + let data = AggregatedDataUpdate::from_task(&mut follower).invert(); let followers: Vec<_> = get_followers(&follower); drop(follower); @@ -941,6 +1125,10 @@ impl AggregationUpdateQueue { new_follower_id: TaskId, mut upper_ids: Vec, ) { + #[cfg(feature = "trace_aggregation_update")] + let _span = + trace_span!("process new follower (n uppers)", uppers = upper_ids.len()).entered(); + let follower_aggregation_number = { let follower = ctx.task(new_follower_id, TaskDataCategory::Meta); get_aggregation_number(&follower) @@ -977,16 +1165,35 @@ impl AggregationUpdateQueue { if !upper_ids.is_empty() { let mut follower = ctx.task(new_follower_id, TaskDataCategory::Meta); + let mut uppers_count: Option = None; + let mut persistent_uppers = 0; upper_ids.retain(|&upper_id| { if update_count!(follower, Upper { task: upper_id }, 1) { // It's a new upper + let uppers_count = uppers_count.get_or_insert_with(|| { + let count = + iter_many!(follower, Upper { .. } count if *count > 0 => ()).count(); + count - 1 + }); + *uppers_count += 1; + if !upper_id.is_transient() { + persistent_uppers += 1; + } true } else { // It's already an upper false } }); + #[cfg(feature = "trace_aggregation_update")] + let _span = trace_span!("new inner").entered(); if !upper_ids.is_empty() { + if update_ucount_and_get!(follower, PersistentUpperCount, persistent_uppers) + .is_power_of_two() + { + self.push_optimize_task(new_follower_id); + } + let data = AggregatedDataUpdate::from_task(&mut follower); let children: Vec<_> = get_followers(&follower); drop(follower); @@ -1019,6 +1226,8 @@ impl AggregationUpdateQueue { self.push_find_and_schedule_dirty(new_follower_id); } if !upper_ids_as_follower.is_empty() { + #[cfg(feature = "trace_aggregation_update")] + let _span = trace_span!("new follower").entered(); self.push(AggregationUpdateJob::InnerOfUppersHasNewFollower { upper_ids: upper_ids_as_follower, new_follower_id, @@ -1032,6 +1241,13 @@ impl AggregationUpdateQueue { new_follower_ids: Vec, upper_id: TaskId, ) { + #[cfg(feature = "trace_aggregation_update")] + let _span = trace_span!( + "process new follower (n followers)", + followers = new_follower_ids.len() + ) + .entered(); + let mut followers_with_aggregation_number = new_follower_ids .into_iter() .map(|new_follower_id| { @@ -1071,6 +1287,13 @@ impl AggregationUpdateQueue { for &(follower_id, _) in followers_with_aggregation_number.iter() { let mut follower = ctx.task(follower_id, TaskDataCategory::Meta); if update_count!(follower, Upper { task: upper_id }, 1) { + if !upper_id.is_transient() { + #[allow(clippy::collapsible_if, reason = "readablility")] + if update_ucount_and_get!(follower, PersistentUpperCount, 1).is_power_of_two() { + self.push_optimize_task(follower_id); + } + } + // It's a new upper let data = AggregatedDataUpdate::from_task(&mut follower); let children: Vec<_> = get_followers(&follower); @@ -1083,11 +1306,16 @@ impl AggregationUpdateQueue { } } if !upper_new_followers.is_empty() { + #[cfg(feature = "trace_aggregation_update")] + let _span = trace_span!("new follower").entered(); + self.push(AggregationUpdateJob::InnerOfUpperHasNewFollowers { upper_id, new_follower_ids: upper_new_followers, }); } + #[cfg(feature = "trace_aggregation_update")] + let _span = trace_span!("new inner").entered(); if !upper_data_updates.is_empty() { // add data to upper let mut upper = ctx.task(upper_id, TaskDataCategory::Meta); @@ -1137,6 +1365,9 @@ impl AggregationUpdateQueue { new_follower_id: TaskId, upper_id: TaskId, ) { + #[cfg(feature = "trace_aggregation_update")] + let _span = trace_span!("process new follower").entered(); + let follower_aggregation_number = { let follower = ctx.task(new_follower_id, TaskDataCategory::Meta); get_aggregation_number(&follower) @@ -1144,7 +1375,7 @@ impl AggregationUpdateQueue { let mut upper = ctx.task(upper_id, TaskDataCategory::Meta); if upper.has_key(&CachedDataItemKey::AggregateRoot {}) { - self.find_and_schedule.insert(new_follower_id); + self.find_and_schedule.insert_back(new_follower_id); } // decide if it should be an inner or follower let upper_aggregation_number = get_aggregation_number(&upper); @@ -1152,6 +1383,9 @@ impl AggregationUpdateQueue { if !is_root_node(upper_aggregation_number) && upper_aggregation_number <= follower_aggregation_number { + #[cfg(feature = "trace_aggregation_update")] + let _span = trace_span!("new follower").entered(); + // It's a follower of the upper node if update_count!( upper, @@ -1167,10 +1401,19 @@ impl AggregationUpdateQueue { }); } } else { + #[cfg(feature = "trace_aggregation_update")] + let _span = trace_span!("new inner").entered(); + // It's an inner node, continue with the list drop(upper); let mut follower = ctx.task(new_follower_id, TaskDataCategory::Meta); if update_count!(follower, Upper { task: upper_id }, 1) { + if !upper_id.is_transient() { + #[allow(clippy::collapsible_if, reason = "readablility")] + if update_ucount_and_get!(follower, PersistentUpperCount, 1).is_power_of_two() { + self.push_optimize_task(new_follower_id); + } + } // It's a new upper let data = AggregatedDataUpdate::from_task(&mut follower); let children: Vec<_> = get_followers(&follower); @@ -1203,6 +1446,10 @@ impl AggregationUpdateQueue { base_effective_distance: Option>, base_aggregation_number: u32, ) { + #[cfg(feature = "trace_aggregation_update")] + let _span = + trace_span!("process update aggregation numger", base_aggregation_number).entered(); + let mut task = ctx.task(task_id, TaskDataCategory::Meta); let current = get!(task, AggregationNumber).copied().unwrap_or_default(); let old = current.effective; @@ -1233,6 +1480,8 @@ impl AggregationUpdateQueue { }); } } else { + #[cfg(feature = "trace_aggregation_update")] + let _span = trace_span!("update aggregation numger", aggregation_number).entered(); task.insert(CachedDataItem::AggregationNumber { value: AggregationNumber { base: base_aggregation_number, @@ -1279,6 +1528,112 @@ impl AggregationUpdateQueue { } } } + + fn optimize_task(&mut self, ctx: &mut impl ExecuteContext<'_>, task_id: TaskId) { + #[cfg(feature = "trace_aggregation_update")] + let _span = trace_span!("optimize").entered(); + + let task = ctx.task(task_id, TaskDataCategory::Meta); + let aggregation_number = get!(task, AggregationNumber).copied().unwrap_or_default(); + if is_root_node(aggregation_number.effective) { + return; + } + let upper_count = get!(task, PersistentUpperCount) + .copied() + .unwrap_or_default(); + if upper_count <= aggregation_number.effective { + // Doesn't need optimization + return; + } + let uppers = get_uppers(&task); + drop(task); + + if !is_aggregating_node(aggregation_number.effective) { + self.push(AggregationUpdateJob::UpdateAggregationNumber { + task_id, + base_aggregation_number: LEAF_NUMBER, + distance: None, + }); + return; + } + + let mut root_uppers = 0; + + let mut uppers_aggregation_numbers = uppers + .iter() + .filter_map(|upper_id| { + if upper_id.is_transient() { + return None; + } + let upper = ctx.task(*upper_id, TaskDataCategory::Meta); + let n = get_aggregation_number(&upper); + if is_root_node(n) { + root_uppers += 1; + None + } else { + Some(Reverse(n)) + } + }) + .collect::>(); + uppers_aggregation_numbers.sort_unstable(); + + // This is the aggregation number where work is minimal + let min_work_aggregation_number = if let Some(upper) = uppers_aggregation_numbers.first() { + upper.0 + 1 + } else { + return; + }; + let minimal_work = root_uppers; + + let mut new_aggregation_number = max( + min_work_aggregation_number, + minimal_work.try_into().unwrap_or(u32::MAX), + ); + + let mut i = 0; + loop { + // A smaller aggregation number will conflict + let mut next_aggregation_number = new_aggregation_number - 1; + + // Find a possible smaller aggregation number to is valid + while let Some(n) = uppers_aggregation_numbers.get(i) { + match n.0.cmp(&next_aggregation_number) { + std::cmp::Ordering::Less => { + i += 1; + } + std::cmp::Ordering::Equal => { + next_aggregation_number -= 1; + i += 1; + } + std::cmp::Ordering::Greater => break, + } + } + + // Compute the work for that case + let work = root_uppers + i; + if work > next_aggregation_number as usize { + break; + } + + // Find the smallest number in that range + let min_aggregation_number = if let Some(upper) = uppers_aggregation_numbers.get(i) { + upper.0 + 1 + } else { + aggregation_number.effective + }; + new_aggregation_number = + max(min_aggregation_number, work.try_into().unwrap_or(u32::MAX)); + } + + if aggregation_number.effective != new_aggregation_number { + self.push(AggregationUpdateJob::UpdateAggregationNumber { + task_id, + base_aggregation_number: new_aggregation_number + .saturating_sub(aggregation_number.distance), + distance: None, + }); + } + } } impl Operation for AggregationUpdateQueue { diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs index 20003523bef07..f0fc490613381 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs @@ -13,7 +13,7 @@ use crate::{ invalidate::{make_task_dirty, TaskDirtyCause}, AggregatedDataUpdate, ExecuteContext, Operation, TaskGuard, }, - storage::{update, update_count}, + storage::{update_count, update_ucount_and_get}, TaskDataCategory, }, data::{CachedDataItemKey, CellRef, CollectibleRef, CollectiblesRef}, @@ -83,11 +83,7 @@ impl Operation for CleanupOldEdgesOperation { task.remove(&CachedDataItemKey::Child { task: child_id }); } let remove_children_count = u32::try_from(children.len()).unwrap(); - update!(task, ChildrenCount, |count: Option| { - // If this underflows, we messed up counting somewhere - let count = count.unwrap_or_default() - remove_children_count; - (count != 0).then_some(count) - }); + update_ucount_and_get!(task, ChildrenCount, -remove_children_count); if is_aggregating_node(get_aggregation_number(&task)) { queue.push(AggregationUpdateJob::InnerOfUpperLostFollowers { upper_id: task_id, diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs index 36b54258d4b9d..450e24757d5cd 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs @@ -12,7 +12,7 @@ use crate::{ }, is_root_node, ExecuteContext, Operation, TaskGuard, }, - storage::{get, update}, + storage::{get, update_ucount_and_get}, TaskDataCategory, }, data::{CachedDataItem, CachedDataItemKey}, @@ -46,17 +46,12 @@ impl ConnectChildOperation { task: child_task_id, value: (), }) { - // Update the children count - let mut children_count = 0; - update!(parent_task, ChildrenCount, |count: Option| { - children_count = count.unwrap_or_default() + 1; - Some(children_count) - }); - - // Update the task aggregation let mut queue = AggregationUpdateQueue::new(); - // Compute new parent aggregation number based on the number of children + // Update the children count + let children_count = update_ucount_and_get!(parent_task, ChildrenCount, 1); + + // Compute future parent aggregation number based on the number of children let current_parent_aggregation = get!(parent_task, AggregationNumber) .copied() .unwrap_or_default(); @@ -130,6 +125,8 @@ impl ConnectChildOperation { } } + #[cfg(feature = "trace_aggregation_update")] + let _span = tracing::trace_span!("connect_child").entered(); ConnectChildOperation::UpdateAggregation { aggregation_update: queue, } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs index d49e712617b75..fb475878929a2 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs @@ -487,12 +487,73 @@ macro_rules! update { }; } +macro_rules! update_ucount_and_get { + ($task:ident, $key:ident $input:tt, -$update:expr) => { + match $update { + update => { + let mut value = 0; + $crate::backend::storage::update!($task, $key $input, |old: Option<_>| { + if let Some(old) = old { + value = old - update; + (value != 0).then_some(value) + } else { + None + } + }); + value + } + } + }; + ($task:ident, $key:ident $input:tt, $update:expr) => { + match $update { + update => { + let mut value = 0; + $crate::backend::storage::update!($task, $key $input, |old: Option<_>| { + if let Some(old) = old { + value = old + update; + (value != 0).then_some(value) + } else { + value = update; + (update != 0).then_some(update) + } + }); + value + } + } + }; + ($task:ident, $key:ident, -$update:expr) => { + $crate::backend::storage::update_ucount_and_get!($task, $key {}, -$update) + }; + ($task:ident, $key:ident, $update:expr) => { + $crate::backend::storage::update_ucount_and_get!($task, $key {}, $update) + }; +} + macro_rules! update_count { + ($task:ident, $key:ident $input:tt, -$update:expr) => { + match $update { + update => { + let mut state_change = false; + $crate::backend::storage::update!($task, $key $input, |old: Option<_>| { + #[allow(unused_comparisons, reason = "type of update might be unsigned, where update < 0 is always false")] + if let Some(old) = old { + let new = old - update; + state_change = old <= 0 && new > 0 || old > 0 && new <= 0; + (new != 0).then_some(new) + } else { + state_change = update < 0; + (update != 0).then_some(-update) + } + }); + state_change + } + } + }; ($task:ident, $key:ident $input:tt, $update:expr) => { match $update { update => { let mut state_change = false; - $crate::backend::storage::update!($task, $key $input, |old: Option| { + $crate::backend::storage::update!($task, $key $input, |old: Option<_>| { if let Some(old) = old { let new = old + update; state_change = old <= 0 && new > 0 || old > 0 && new <= 0; @@ -506,7 +567,9 @@ macro_rules! update_count { } } }; - ($task:ident, $key:ident, $update:expr) => { + ($task:ident, $key:ident, -$update:expr) => { + $crate::backend::storage::update_count!($task, $key {}, -$update) + }; ($task:ident, $key:ident, $update:expr) => { $crate::backend::storage::update_count!($task, $key {}, $update) }; } @@ -535,3 +598,4 @@ pub(crate) use iter_many; pub(crate) use remove; pub(crate) use update; pub(crate) use update_count; +pub(crate) use update_ucount_and_get; diff --git a/turbopack/crates/turbo-tasks-backend/src/data.rs b/turbopack/crates/turbo-tasks-backend/src/data.rs index 34222a46fc9ff..8e6ef6be0faf0 100644 --- a/turbopack/crates/turbo-tasks-backend/src/data.rs +++ b/turbopack/crates/turbo-tasks-backend/src/data.rs @@ -369,6 +369,10 @@ pub enum CachedDataItem { task: TaskId, value: i32, }, + PersistentUpperCount { + // Only counting persistent tasks + value: u32, + }, // Aggregated Data AggregatedDirtyContainer { @@ -453,6 +457,7 @@ impl CachedDataItem { CachedDataItem::AggregationNumber { .. } => true, CachedDataItem::Follower { task, .. } => !task.is_transient(), CachedDataItem::Upper { task, .. } => !task.is_transient(), + CachedDataItem::PersistentUpperCount { .. } => true, CachedDataItem::AggregatedDirtyContainer { task, .. } => !task.is_transient(), CachedDataItem::AggregatedCollectible { collectible, .. } => { !collectible.cell.task.is_transient() @@ -518,6 +523,7 @@ impl CachedDataItemKey { CachedDataItemKey::AggregationNumber { .. } => true, CachedDataItemKey::Follower { task, .. } => !task.is_transient(), CachedDataItemKey::Upper { task, .. } => !task.is_transient(), + CachedDataItemKey::PersistentUpperCount {} => true, CachedDataItemKey::AggregatedDirtyContainer { task, .. } => !task.is_transient(), CachedDataItemKey::AggregatedCollectible { collectible, .. } => { !collectible.cell.task.is_transient() @@ -562,6 +568,7 @@ impl CachedDataItemKey { | CachedDataItemKey::Dirty { .. } | CachedDataItemKey::Follower { .. } | CachedDataItemKey::Upper { .. } + | CachedDataItemKey::PersistentUpperCount { .. } | CachedDataItemKey::AggregatedDirtyContainer { .. } | CachedDataItemKey::AggregatedCollectible { .. } | CachedDataItemKey::AggregatedDirtyContainerCount { .. } diff --git a/turbopack/crates/turbo-tasks-backend/src/utils/deque_set.rs b/turbopack/crates/turbo-tasks-backend/src/utils/deque_set.rs new file mode 100644 index 0000000000000..fbf32aec136af --- /dev/null +++ b/turbopack/crates/turbo-tasks-backend/src/utils/deque_set.rs @@ -0,0 +1,70 @@ +use std::{ + collections::{HashSet, VecDeque}, + hash::{BuildHasher, BuildHasherDefault, Hash}, +}; + +use rustc_hash::FxHasher; +use serde::{Deserialize, Serialize}; + +// TODO This could be more efficent: +// - Not storing items twice +// - Not serializing items twice + +#[derive(Clone, Serialize, Deserialize)] +#[serde(bound( + deserialize = "T: Hash + Eq + Deserialize<'de>, B: BuildHasher + Default", + serialize = "T: Hash + Eq + Serialize, B: BuildHasher + Default" +))] +pub struct DequeSet> { + set: HashSet, + queue: VecDeque, +} + +impl Default for DequeSet { + fn default() -> Self { + Self { + set: Default::default(), + queue: Default::default(), + } + } +} + +impl DequeSet { + pub fn is_empty(&self) -> bool { + self.queue.is_empty() + } + + #[allow(dead_code)] + pub fn len(&self) -> usize { + self.queue.len() + } +} + +impl DequeSet { + pub fn insert_back(&mut self, item: T) -> bool { + if self.set.insert(item.clone()) { + self.queue.push_back(item); + true + } else { + false + } + } + + pub fn pop_front(&mut self) -> Option { + if let Some(item) = self.queue.pop_front() { + self.set.remove(&item); + Some(item) + } else { + None + } + } +} + +impl Extend for DequeSet { + fn extend>(&mut self, iter: I) { + self.queue.extend( + iter.into_iter() + .filter(|item| self.set.insert(item.clone())), + ); + } +} diff --git a/turbopack/crates/turbo-tasks-backend/src/utils/mod.rs b/turbopack/crates/turbo-tasks-backend/src/utils/mod.rs index c2c585efbd0cf..1c29836ddb075 100644 --- a/turbopack/crates/turbo-tasks-backend/src/utils/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/utils/mod.rs @@ -1,5 +1,6 @@ pub mod bi_map; pub mod chunked_vec; pub mod dash_map_multi; +pub mod deque_set; pub mod ptr_eq_arc; pub mod sharded;