Skip to content

Commit

Permalink
aggregation improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Sep 5, 2024
1 parent 8bbd844 commit 75dc280
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fn get_followers_with_aggregation_number(
aggregation_number: u32,
) -> Vec<TaskId> {
if is_aggregating_node(aggregation_number) {
get_many!(task, Follower { task } => task)
get_many!(task, Follower { task } count if count > 0 => task)
} else {
get_many!(task, Child { task } => task)
}
Expand All @@ -37,11 +37,19 @@ fn get_followers(task: &TaskGuard<'_>) -> Vec<TaskId> {
get_followers_with_aggregation_number(task, get_aggregation_number(task))
}

pub fn get_uppers(task: &TaskGuard<'_>) -> Vec<TaskId> {
get_many!(task, Upper { task } count if count > 0 => task)
}

fn iter_uppers<'a>(task: &'a TaskGuard<'a>) -> impl Iterator<Item = TaskId> + 'a {
iter_many!(task, Upper { task } count if count > 0 => task)
}

pub fn get_aggregation_number(task: &TaskGuard<'_>) -> u32 {
get!(task, AggregationNumber).copied().unwrap_or_default()
}

#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum AggregationUpdateJob {
UpdateAggregationNumber {
task_id: TaskId,
Expand Down Expand Up @@ -80,9 +88,9 @@ pub enum AggregationUpdateJob {
},
}

#[derive(Default, Serialize, Deserialize, Clone)]
#[derive(Default, Serialize, Deserialize, Clone, Debug)]
pub struct AggregatedDataUpdate {
unfinished: i32,
dirty_task_count: i32,
dirty_tasks_update: HashMap<TaskId, i32>,
// TODO collectibles
}
Expand All @@ -92,7 +100,7 @@ impl AggregatedDataUpdate {
let aggregation = get_aggregation_number(task);
let dirty = get!(task, Dirty).is_some();
if is_aggregating_node(aggregation) {
let mut unfinished = get!(task, AggregatedDirtyTaskCount).copied().unwrap_or(0);
let mut dirty_task_count = get!(task, AggregatedDirtyTaskCount).copied().unwrap_or(0);
let mut dirty_tasks_update = task
.iter()
.filter_map(|(key, _)| match *key {
Expand All @@ -101,11 +109,11 @@ impl AggregatedDataUpdate {
})
.collect::<HashMap<_, _>>();
if dirty {
unfinished += 1;
dirty_task_count += 1;
dirty_tasks_update.insert(task.id(), 1);
}
Self {
unfinished: unfinished as i32,
dirty_task_count: dirty_task_count as i32,

Check failure on line 116 in turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs

View workflow job for this annotation

GitHub Actions / rust check / build

casting to the same type is unnecessary (`i32` -> `i32`)
dirty_tasks_update,
}
} else if dirty {
Expand All @@ -116,7 +124,7 @@ impl AggregatedDataUpdate {
}

fn invert(mut self) -> Self {
self.unfinished = -self.unfinished;
self.dirty_task_count = -self.dirty_task_count;
for value in self.dirty_tasks_update.values_mut() {
*value = -*value;
}
Expand All @@ -129,27 +137,22 @@ impl AggregatedDataUpdate {
queue: &mut AggregationUpdateQueue,
) -> AggregatedDataUpdate {
let Self {
unfinished,
dirty_task_count,
dirty_tasks_update,
} = self;
let mut result = Self::default();
if *unfinished != 0 {
update!(task, AggregatedDirtyTaskCount, |old: Option<u32>| {
if *dirty_task_count != 0 {
update!(task, AggregatedDirtyTaskCount, |old: Option<i32>| {
let old = old.unwrap_or(0);
let new = old as i32 + *unfinished;
debug_assert!(new >= 0);
let new = new as u32;
if new == 0 {
result.unfinished = -1;
None
} else {
if old <= 0 && new > 0 {
result.unfinished = 1;
}
Some(new)
let new = old + *dirty_task_count;
if old <= 0 && new > 0 {
result.dirty_task_count = 1;
} else if old > 0 && new <= 0 {
result.dirty_task_count = -1;
}
(new != 0).then_some(new)
});
if result.unfinished == -1 {
if result.dirty_task_count == -1 {
if let Some(root_state) = get!(task, AggregateRoot) {
root_state.all_clean_event.notify(usize::MAX);
}
Expand All @@ -162,23 +165,18 @@ impl AggregatedDataUpdate {
update!(
task,
AggregatedDirtyTask { task: *task_id },
|old: Option<u32>| {
|old: Option<i32>| {
let old = old.unwrap_or(0);
let new = old as i32 + *count;
debug_assert!(new >= 0);
let new = new as u32;
if new == 0 {
result.dirty_tasks_update.insert(*task_id, -1);
None
} else {
if old <= 0 && new > 0 {
if root {
task_to_schedule.push(*task_id);
}
result.dirty_tasks_update.insert(*task_id, 1);
let new = old + *count;
if old <= 0 && new > 0 {
if root {
task_to_schedule.push(*task_id);
}
Some(new)
result.dirty_tasks_update.insert(*task_id, 1);
} else if old > 0 && new <= 0 {
result.dirty_tasks_update.insert(*task_id, -1);
}
(new != 0).then_some(new)
}
);
}
Expand All @@ -193,22 +191,22 @@ impl AggregatedDataUpdate {

fn is_empty(&self) -> bool {
let Self {
unfinished,
dirty_task_count,
dirty_tasks_update,
} = self;
*unfinished == 0 && dirty_tasks_update.is_empty()
*dirty_task_count == 0 && dirty_tasks_update.is_empty()
}

pub fn dirty_task(task_id: TaskId) -> Self {
Self {
unfinished: 1,
dirty_task_count: 1,
dirty_tasks_update: HashMap::from([(task_id, 1)]),
}
}

pub fn no_longer_dirty_task(task_id: TaskId) -> Self {
Self {
unfinished: -1,
dirty_task_count: -1,
dirty_tasks_update: HashMap::from([(task_id, -1)]),
}
}
Expand Down Expand Up @@ -236,7 +234,7 @@ impl Add for AggregatedDataUpdate {
}
}
Self {
unfinished: self.unfinished + rhs.unfinished,
dirty_task_count: self.dirty_task_count + rhs.dirty_task_count,
dirty_tasks_update,
}
}
Expand Down Expand Up @@ -297,13 +295,21 @@ impl AggregationUpdateQueue {
if is_aggregating_node(aggregation_number) {
// followers might become inner nodes when the aggregation number is
// increased
let followers = iter_many!(task, Follower { task } => task);
let followers =
iter_many!(task, Follower { task } count if count > 0 => task);
for follower_id in followers {
self.jobs.push_back(AggregationUpdateJob::BalanceEdge {
upper_id: task_id,
task_id: follower_id,
});
}
let uppers = iter_uppers(&task);
for upper_id in uppers {
self.jobs.push_back(AggregationUpdateJob::BalanceEdge {
upper_id,
task_id,
});
}
} else {
let children = iter_many!(task, Child { task } => task);
for child_id in children {
Expand Down Expand Up @@ -383,18 +389,20 @@ impl AggregationUpdateQueue {
let children: Vec<_> = get_followers(&follower);
drop(follower);

for upper_id in upper_ids.iter() {
// add data to upper
let mut upper = ctx.task(*upper_id);
let diff = data.apply(&mut upper, self);
if !diff.is_empty() {
let upper_ids = get_many!(upper, Upper { task } => task);
self.jobs.push_back(
AggregationUpdateJob::AggregatedDataUpdate {
upper_ids,
update: diff,
},
)
if !data.is_empty() {
for upper_id in upper_ids.iter() {
// add data to upper
let mut upper = ctx.task(*upper_id);
let diff = data.apply(&mut upper, self);
if !diff.is_empty() {
let upper_ids = get_uppers(&upper);
self.jobs.push_back(
AggregationUpdateJob::AggregatedDataUpdate {
upper_ids,
update: diff,
},
)
}
}
}
if !children.is_empty() {
Expand Down Expand Up @@ -471,17 +479,20 @@ impl AggregationUpdateQueue {
let children: Vec<_> = get_followers(&follower);
drop(follower);

for upper_id in upper_ids.iter() {
// remove data from upper
let mut upper = ctx.task(*upper_id);
let diff = data.apply(&mut upper, self);
if !diff.is_empty() {
let upper_ids = get_many!(upper, Upper { task } => task);
self.jobs
.push_back(AggregationUpdateJob::AggregatedDataUpdate {
upper_ids,
update: diff,
})
if !data.is_empty() {
for upper_id in upper_ids.iter() {
// remove data from upper
let mut upper = ctx.task(*upper_id);
let diff = data.apply(&mut upper, self);
if !diff.is_empty() {
let upper_ids = get_uppers(&upper);
self.jobs.push_back(
AggregationUpdateJob::AggregatedDataUpdate {
upper_ids,
update: diff,
},
)
}
}
}
if !children.is_empty() {
Expand Down Expand Up @@ -517,7 +528,7 @@ impl AggregationUpdateQueue {
let mut upper = ctx.task(upper_id);
let diff = update.apply(&mut upper, self);
if !diff.is_empty() {
let upper_ids: Vec<_> = get_many!(upper, Upper { task } => task);
let upper_ids = get_uppers(&upper);
if !upper_ids.is_empty() {
self.jobs
.push_back(AggregationUpdateJob::AggregatedDataUpdate {
Expand All @@ -529,17 +540,14 @@ impl AggregationUpdateQueue {
}
}
AggregationUpdateJob::DataUpdate { task_id, update } => {
let mut task = ctx.task(task_id);
let diff = update.apply(&mut task, self);
if !diff.is_empty() {
let upper_ids: Vec<_> = get_many!(task, Upper { task } => task);
if !upper_ids.is_empty() {
self.jobs
.push_back(AggregationUpdateJob::AggregatedDataUpdate {
upper_ids,
update: diff,
});
}
let task = ctx.task(task_id);
let upper_ids: Vec<_> = get_uppers(&task);
if !upper_ids.is_empty() {
self.jobs
.push_back(AggregationUpdateJob::AggregatedDataUpdate {
upper_ids,
update: update.clone(),
});
}
}
AggregationUpdateJob::ScheduleWhenDirty { task_ids } => {
Expand All @@ -565,16 +573,13 @@ impl AggregationUpdateQueue {
if should_be_inner {
// remove all follower edges
let count = remove!(upper, Follower { task: task_id }).unwrap_or_default();
if count > 0 {
// notify uppers about lost follower
let upper_ids: Vec<_> = get_many!(upper, Upper { task } => task);
if !upper_ids.is_empty() {
self.jobs
.push_back(AggregationUpdateJob::InnerLostFollower {
upper_ids,
lost_follower_id: task_id,
});
}
if count < 0 {

Check failure on line 576 in turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs

View workflow job for this annotation

GitHub Actions / rust check / build

`if` chain can be rewritten with `match`
upper.add_new(CachedDataItem::Follower {
task: task_id,
value: count,
})
} else if count > 0 {
let upper_ids = get_uppers(&upper);

// Add the same amount of upper edges
if update_count!(task, Upper { task: upper_id }, count as i32) {
Expand All @@ -584,7 +589,6 @@ impl AggregationUpdateQueue {
let followers = get_followers(&task);
let diff = data.apply(&mut upper, self);

let upper_ids: Vec<_> = get_many!(upper, Upper { task } => task);
if !upper_ids.is_empty() {
if !diff.is_empty() {
// Notify uppers about changed aggregated data
Expand All @@ -598,26 +602,36 @@ impl AggregationUpdateQueue {
if !followers.is_empty() {
self.jobs.push_back(
AggregationUpdateJob::InnerHasNewFollowers {
upper_ids,
upper_ids: upper_ids.clone(),
new_follower_ids: followers,
},
);
}
}
}

// notify uppers about lost follower
if !upper_ids.is_empty() {
self.jobs
.push_back(AggregationUpdateJob::InnerLostFollower {
upper_ids,
lost_follower_id: task_id,
});
}
}
} else if should_be_follower {
// Remove the upper edge
let count = remove!(task, Upper { task: upper_id }).unwrap_or_default();
if count > 0 {
let upper_ids: Vec<_> = get_uppers(&upper);

// Add the same amount of follower edges
if update_count!(upper, Follower { task: task_id }, count as i32) {
// notify uppers about new follower
let upper_ids: Vec<_> = get_many!(upper, Upper { task } => task);
if !upper_ids.is_empty() {
self.jobs.push_back(
AggregationUpdateJob::InnerHasNewFollower {
upper_ids,
upper_ids: upper_ids.clone(),
new_follower_id: task_id,
},
);
Expand All @@ -629,7 +643,6 @@ impl AggregationUpdateQueue {
let data = AggregatedDataUpdate::from_task(&mut task).invert();
let followers = get_followers(&task);
let diff = data.apply(&mut upper, self);
let upper_ids: Vec<_> = get_many!(upper, Upper { task } => task);
if !upper_ids.is_empty() {
if !diff.is_empty() {
self.jobs.push_back(
Expand Down
Loading

0 comments on commit 75dc280

Please sign in to comment.