Skip to content

Commit

Permalink
[Turbopack] new backend cleanup (#71132)
Browse files Browse the repository at this point in the history
### What?

review comments from #70945
  • Loading branch information
sokra authored Oct 11, 2024
1 parent fd552c5 commit d6e7503
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 41 deletions.
21 changes: 14 additions & 7 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ impl TurboTasksBackendInner {

// Check the dirty count of the root node
let dirty_tasks = get!(task, AggregatedDirtyContainerCount)
.copied()
.cloned()
.unwrap_or_default()
.get(self.session_id);
if dirty_tasks > 0 || is_dirty {
Expand All @@ -375,7 +375,14 @@ impl TurboTasksBackendInner {
value: RootState::new(ActiveType::CachedActiveUntilClean, task_id),
});
// A newly added AggregateRoot need to make sure to schedule the tasks
task_ids_to_schedule = get_many!(task, AggregatedDirtyContainer { task } count if count.get(self.session_id) > 0 => task);
task_ids_to_schedule = get_many!(
task,
AggregatedDirtyContainer {
task
} count if count.get(self.session_id) > 0 => {
*task
}
);
if is_dirty {
task_ids_to_schedule.push(task_id);
}
Expand Down Expand Up @@ -1076,7 +1083,7 @@ impl TurboTasksBackendInner {
// handle cell counters: update max index and remove cells that are no longer used
let mut removed_cells = HashMap::new();
let mut old_counters: HashMap<_, _> =
get_many!(task, CellTypeMaxIndex { cell_type } max_index => (cell_type, max_index));
get_many!(task, CellTypeMaxIndex { cell_type } max_index => (*cell_type, *max_index));
for (&cell_type, &max_index) in cell_counters.iter() {
if let Some(old_max_index) = old_counters.remove(&cell_type) {
if old_max_index != max_index {
Expand Down Expand Up @@ -1234,7 +1241,7 @@ impl TurboTasksBackendInner {

let data_update = if old_dirty_state.is_some() || new_dirty_state.is_some() {
let mut dirty_containers = get!(task, AggregatedDirtyContainerCount)
.copied()
.cloned()
.unwrap_or_default();
if let Some(old_dirty_state) = old_dirty_state {
dirty_containers.update_with_dirty_state(&old_dirty_state);
Expand All @@ -1245,7 +1252,7 @@ impl TurboTasksBackendInner {
(None, Some(new)) => dirty_containers.update_with_dirty_state(&new),
(Some(old), Some(new)) => dirty_containers.replace_dirty_state(&old, &new),
};
if !aggregated_update.is_default() {
if !aggregated_update.is_zero() {
if aggregated_update.get(self.session_id) < 0 {
if let Some(root_state) = get!(task, AggregateRoot) {
root_state.all_clean_event.notify(usize::MAX);
Expand Down Expand Up @@ -1403,7 +1410,7 @@ impl TurboTasksBackendInner {
task,
AggregatedCollectible {
collectible
} count if collectible.collectible_type == collectible_type && count > 0 => {
} count if collectible.collectible_type == collectible_type && *count > 0 => {
collectible.cell
}
) {
Expand All @@ -1416,7 +1423,7 @@ impl TurboTasksBackendInner {
Collectible {
collectible
} count if collectible.collectible_type == collectible_type => {
(collectible.cell, count)
(collectible.cell, *count)
}
) {
*collectibles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ fn get_followers_with_aggregation_number(
aggregation_number: u32,
) -> Vec<TaskId> {
if is_aggregating_node(aggregation_number) {
get_many!(task, Follower { task } count if count > 0 => task)
get_many!(task, Follower { task } count if *count > 0 => *task)
} else {
get_many!(task, Child { task } => task)
get_many!(task, Child { task } => *task)
}
}

Expand All @@ -42,11 +42,11 @@ fn get_followers(task: &TaskGuard<'_>) -> Vec<TaskId> {
}

pub fn get_uppers(task: &TaskGuard<'_>) -> Vec<TaskId> {
get_many!(task, Upper { task } count if count > 0 => task)
get_many!(task, Upper { task } count if *count > 0 => *task)
}

fn iter_uppers<'a>(task: &'a TaskGuard<'a>) -> impl Iterator<Item = TaskId> + 'a {
iter_many!(task, Upper { task } count if count > 0 => task)
iter_many!(task, Upper { task } count if *count > 0 => *task)
}

pub fn get_aggregation_number(task: &TaskGuard<'_>) -> u32 {
Expand Down Expand Up @@ -126,17 +126,17 @@ impl AggregatedDataUpdate {
let aggregation = get_aggregation_number(task);
let mut dirty_container_count = Default::default();
let mut collectibles_update: Vec<_> =
get_many!(task, Collectible { collectible } => (collectible, 1));
get_many!(task, Collectible { collectible } => (*collectible, 1));
if is_aggregating_node(aggregation) {
dirty_container_count = get!(task, AggregatedDirtyContainerCount)
.copied()
.cloned()
.unwrap_or_default();
let collectibles = iter_many!(
task,
AggregatedCollectible {
collectible
} count if count > 0 => {
collectible
} count if *count > 0 => {
*collectible
}
);
for collectible in collectibles {
Expand All @@ -148,7 +148,7 @@ impl AggregatedDataUpdate {
}

let mut result = Self::new().collectibles_update(collectibles_update);
if !dirty_container_count.is_default() {
if !dirty_container_count.is_zero() {
let DirtyContainerCount {
count,
count_in_session,
Expand All @@ -170,7 +170,7 @@ impl AggregatedDataUpdate {
collectibles_update,
} = &mut self;
if let Some((_, value)) = dirty_container_update.as_mut() {
*value = value.invert()
*value = value.negate()
}
for (_, value) in collectibles_update.iter_mut() {
*value = -*value;
Expand Down Expand Up @@ -199,7 +199,7 @@ impl AggregatedDataUpdate {
})
}

let mut aggregated_update = Default::default();
let mut aggregated_update = DirtyContainerCount::default();
update!(
task,
AggregatedDirtyContainer {
Expand All @@ -208,7 +208,7 @@ impl AggregatedDataUpdate {
|old: Option<DirtyContainerCount>| {
let mut new = old.unwrap_or_default();
aggregated_update = new.update_count(count);
(!new.is_default()).then_some(new)
(!new.is_zero()).then_some(new)
}
);

Expand All @@ -225,10 +225,10 @@ impl AggregatedDataUpdate {
if let Some(dirty_state) = dirty_state {
new.undo_update_with_dirty_state(&dirty_state);
}
if !aggregated_update.is_default() {
if !aggregated_update.is_zero() {
result.dirty_container_update = Some((task_id, aggregated_update));
}
(!new.is_default()).then_some(new)
(!new.is_zero()).then_some(new)
});
if let Some((_, count)) = result.dirty_container_update.as_ref() {
if count.get(session_id) < 0 {
Expand Down Expand Up @@ -269,8 +269,8 @@ impl AggregatedDataUpdate {
CollectiblesDependent {
collectible_type,
task,
} if collectible_type == ty => {
task
} if *collectible_type == ty => {
*task
}
);
if !dependent.is_empty() {
Expand Down Expand Up @@ -608,7 +608,7 @@ impl AggregationUpdateQueue {
value: RootState::new(ActiveType::CachedActiveUntilClean, task_id),
});
}
let dirty_containers: Vec<_> = get_many!(task, AggregatedDirtyContainer { task } count if count.get(session_id) > 0 => task);
let dirty_containers: Vec<_> = get_many!(task, AggregatedDirtyContainer { task } count if count.get(session_id) > 0 => *task);
if !dirty_containers.is_empty() {
self.push(AggregationUpdateJob::FindAndScheduleDirty {
task_ids: dirty_containers,
Expand Down Expand Up @@ -954,7 +954,7 @@ impl AggregationUpdateQueue {
if !is_aggregating_node(old) && is_aggregating_node(aggregation_number) {
// When converted from leaf to aggregating node, all children become
// followers
let children: Vec<_> = get_many!(task, Child { task } => task);
let children: Vec<_> = get_many!(task, Child { task } => *task);
for child_id in children {
task.add_new(CachedDataItem::Follower {
task: child_id,
Expand All @@ -966,7 +966,7 @@ impl AggregationUpdateQueue {
if is_aggregating_node(aggregation_number) {
// followers might become inner nodes when the aggregation number is
// increased
let followers = iter_many!(task, Follower { task } count if count > 0 => task);
let followers = iter_many!(task, Follower { task } count if *count > 0 => *task);
for follower_id in followers {
self.push(AggregationUpdateJob::BalanceEdge {
upper_id: task_id,
Expand All @@ -978,7 +978,7 @@ impl AggregationUpdateQueue {
self.push(AggregationUpdateJob::BalanceEdge { upper_id, task_id });
}
} else {
let children = iter_many!(task, Child { task } => task);
let children = iter_many!(task, Child { task } => *task);
for child_id in children {
self.push(AggregationUpdateJob::UpdateAggregationNumber {
task_id: child_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,23 +112,23 @@ pub fn make_task_dirty_internal(
}) => {
// Got dirty in that one session only
let mut dirty_container = get!(task, AggregatedDirtyContainerCount)
.copied()
.cloned()
.unwrap_or_default();
dirty_container.update_session_dependent(session_id, 1);
dirty_container
}
None => {
// Get dirty for all sessions
get!(task, AggregatedDirtyContainerCount)
.copied()
.cloned()
.unwrap_or_default()
}
_ => unreachable!(),
};
let aggregated_update = dirty_container.update_with_dirty_state(&DirtyState {
clean_in_session: None,
});
if !aggregated_update.is_default() {
if !aggregated_update.is_zero() {
queue.extend(AggregationUpdateJob::data_update(
task,
AggregatedDataUpdate::new().dirty_container_update(task_id, aggregated_update),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ impl UpdateCellOperation {

let dependent = get_many!(
task,
CellDependent { cell: dependent_cell, task } _value
if dependent_cell == cell
=> task
CellDependent { cell: dependent_cell, task }
if *dependent_cell == cell
=> *task
);

drop(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ impl UpdateOutputOperation {
value: output_value,
});

let dependent_tasks = get_many!(task, OutputDependent { task } => task);
let children = get_many!(task, Child { task } => task);
let dependent_tasks = get_many!(task, OutputDependent { task } => *task);
let children = get_many!(task, Child { task } => *task);

let mut queue = AggregationUpdateQueue::new();

Expand Down
6 changes: 3 additions & 3 deletions turbopack/crates/turbo-tasks-backend/src/backend/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ macro_rules! iter_many {
$task
.iter($crate::data::indicies::$key)
.filter_map(|(key, _)| match key {
&$crate::data::CachedDataItemKey::$key $key_pattern $(if $cond)? => Some(
$crate::data::CachedDataItemKey::$key $key_pattern $(if $cond)? => Some(
$iter_item
),
_ => None,
Expand All @@ -446,8 +446,8 @@ macro_rules! iter_many {
.iter($crate::data::indicies::$key)
.filter_map(|(key, value)| match (key, value) {
(
&$crate::data::CachedDataItemKey::$key $input,
&$crate::data::CachedDataItemValue::$key { value: $value_pattern }
$crate::data::CachedDataItemKey::$key $input,
$crate::data::CachedDataItemValue::$key { value: $value_pattern }
) $(if $cond)? => Some($iter_item),
_ => None,
})
Expand Down
25 changes: 22 additions & 3 deletions turbopack/crates/turbo-tasks-backend/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,18 @@ fn add_with_diff(v: &mut i32, u: i32) -> i32 {
}
}

#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
/// Represents a count of dirty containers. Since dirtyness can be session dependent, there might be
/// a different count for a specific session. It only need to store the highest session count, since
/// old sessions can't be visited again, so we can ignore their counts.
#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DirtyContainerCount {
pub count: i32,
pub count_in_session: Option<(SessionId, i32)>,
}

impl DirtyContainerCount {
/// Get the count for a specific session. It's only expected to be asked for the current
/// session, since old session counts might be dropped.
pub fn get(&self, session: SessionId) -> i32 {
if let Some((s, count)) = self.count_in_session {
if s == session {
Expand All @@ -124,13 +129,16 @@ impl DirtyContainerCount {
self.count
}

/// Increase/decrease the count by the given value.
pub fn update(&mut self, count: i32) -> DirtyContainerCount {
self.update_count(&DirtyContainerCount {
count,
count_in_session: None,
})
}

/// Increase/decrease the count by the given value, but does not update the count for a specific
/// session. This matches the "dirty, but clean in one session" behavior.
pub fn update_session_dependent(
&mut self,
ignore_session: SessionId,
Expand All @@ -142,6 +150,10 @@ impl DirtyContainerCount {
})
}

/// Adds the `count` to the current count. This correctly handles session dependent counts.
/// Returns a new count object that represents the aggregated count. The aggregated count will
/// be +1 when the self count changes from <= 0 to > 0 and -1 when the self count changes from >
/// 0 to <= 0. The same for the session dependent count.
pub fn update_count(&mut self, count: &DirtyContainerCount) -> DirtyContainerCount {
let mut diff = DirtyContainerCount::default();
match (
Expand Down Expand Up @@ -181,6 +193,7 @@ impl DirtyContainerCount {
diff
}

/// Applies a dirty state to the count. Returns an aggregated count that represents the change.
pub fn update_with_dirty_state(&mut self, dirty: &DirtyState) -> DirtyContainerCount {
if let Some(clean_in_session) = dirty.clean_in_session {
self.update_session_dependent(clean_in_session, 1)
Expand All @@ -189,6 +202,8 @@ impl DirtyContainerCount {
}
}

/// Undoes the effect of a dirty state on the count. Returns an aggregated count that represents
/// the change.
pub fn undo_update_with_dirty_state(&mut self, dirty: &DirtyState) -> DirtyContainerCount {
if let Some(clean_in_session) = dirty.clean_in_session {
self.update_session_dependent(clean_in_session, -1)
Expand All @@ -197,6 +212,8 @@ impl DirtyContainerCount {
}
}

/// Replaces the old dirty state with the new one. Returns an aggregated count that represents
/// the change.
pub fn replace_dirty_state(
&mut self,
old: &DirtyState,
Expand All @@ -207,11 +224,13 @@ impl DirtyContainerCount {
diff
}

pub fn is_default(&self) -> bool {
/// Returns true if the count is zero and appling it would have no effect
pub fn is_zero(&self) -> bool {
self.count == 0 && self.count_in_session.map(|(_, c)| c == 0).unwrap_or(true)
}

pub fn invert(&self) -> Self {
/// Negates the counts.
pub fn negate(&self) -> Self {
Self {
count: -self.count,
count_in_session: self.count_in_session.map(|(s, c)| (s, -c)),
Expand Down

0 comments on commit d6e7503

Please sign in to comment.