From a3b4de8ccdffc99c125b7d1fc3fdd2d61109d6c7 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 16 Feb 2023 12:25:34 +0100 Subject: [PATCH 01/13] improve task scope optimization --- .../turbo-tasks-memory/src/memory_backend.rs | 12 +--- crates/turbo-tasks-memory/src/task.rs | 56 ++++++++++--------- 2 files changed, 33 insertions(+), 35 deletions(-) diff --git a/crates/turbo-tasks-memory/src/memory_backend.rs b/crates/turbo-tasks-memory/src/memory_backend.rs index c4c01192ee085d..36111a464b09e1 100644 --- a/crates/turbo-tasks-memory/src/memory_backend.rs +++ b/crates/turbo-tasks-memory/src/memory_backend.rs @@ -591,7 +591,7 @@ pub(crate) enum Job { ScheduleWhenDirtyFromScope(AutoSet), /// Add tasks from a scope. Scheduled by `run_add_from_scope_queue` to /// split off work. - AddToScopeQueue(VecDeque<(TaskId, usize)>, TaskScopeId, bool), + AddToScopeQueue(VecDeque<(TaskId, usize)>, TaskScopeId, usize), /// Remove tasks from a scope. Scheduled by `run_remove_from_scope_queue` to /// split off work. RemoveFromScopeQueue(VecDeque, TaskScopeId), @@ -638,17 +638,11 @@ impl Job { }) } } - Job::AddToScopeQueue(queue, id, is_optimization_scope) => { + Job::AddToScopeQueue(queue, id, merging_scopes) => { backend .scope_add_remove_priority .run_low(async { - run_add_to_scope_queue( - queue, - id, - is_optimization_scope, - backend, - turbo_tasks, - ); + run_add_to_scope_queue(queue, id, merging_scopes, backend, turbo_tasks); }) .await; } diff --git a/crates/turbo-tasks-memory/src/task.rs b/crates/turbo-tasks-memory/src/task.rs index 418ce6f53394bf..d1e8f9bae70884 100644 --- a/crates/turbo-tasks-memory/src/task.rs +++ b/crates/turbo-tasks-memory/src/task.rs @@ -974,7 +974,7 @@ impl Task { pub(crate) fn add_to_scope_internal_shallow( &self, id: TaskScopeId, - is_optimization_scope: bool, + merging_scopes: usize, depth: usize, backend: &MemoryBackend, turbo_tasks: &dyn TurboTasksBackendApi, @@ -1020,17 +1020,17 @@ impl Task { } if depth < usize::BITS as usize { - if is_optimization_scope { - *optimization_counter = - optimization_counter.saturating_sub(children.len() >> depth) + *optimization_counter += 1; + if merging_scopes > 0 { + *optimization_counter = optimization_counter.saturating_sub(merging_scopes); } else { - *optimization_counter += children.len() >> depth; - if *optimization_counter >= 0x10000 { + const SCOPE_OPTIMIZATION_THRESHOLD: usize = 1024; + if *optimization_counter * children.len() > SCOPE_OPTIMIZATION_THRESHOLD { list.remove(id); drop(self.make_root_scoped_internal(state, backend, turbo_tasks)); return self.add_to_scope_internal_shallow( id, - is_optimization_scope, + merging_scopes, depth, backend, turbo_tasks, @@ -1060,22 +1060,15 @@ impl Task { pub(crate) fn add_to_scope_internal( &self, id: TaskScopeId, - is_optimization_scope: bool, + merging_scopes: usize, backend: &MemoryBackend, turbo_tasks: &dyn TurboTasksBackendApi, ) { // VecDeque::new() would allocate with 7 items capacity. We don't want that. let mut queue = VecDeque::with_capacity(0); - self.add_to_scope_internal_shallow( - id, - is_optimization_scope, - 0, - backend, - turbo_tasks, - &mut queue, - ); + self.add_to_scope_internal_shallow(id, merging_scopes, 0, backend, turbo_tasks, &mut queue); - run_add_to_scope_queue(queue, id, is_optimization_scope, backend, turbo_tasks); + run_add_to_scope_queue(queue, id, merging_scopes, backend, turbo_tasks); } fn add_self_to_new_scope( @@ -1332,7 +1325,7 @@ impl Task { } } - pub(crate) fn remove_root_or_initial_scope( + fn remove_root_or_initial_scope( &self, backend: &MemoryBackend, turbo_tasks: &dyn TurboTasksBackendApi, @@ -1468,9 +1461,11 @@ impl Task { let schedule_self = self.add_self_to_new_scope(&mut state, root_scope, backend, turbo_tasks); + let mut merging_scopes = 0; // remove self from old scopes for (scope, count) in scopes.iter() { if *count > 0 { + merging_scopes += 1; self.remove_self_from_scope_full(&mut state, *scope, backend, turbo_tasks); } } @@ -1483,7 +1478,12 @@ impl Task { // Add children to new root scope for child in children.iter() { backend.with_task(*child, |child| { - child.add_to_scope_internal(root_scope, true, backend, turbo_tasks); + child.add_to_scope_internal( + root_scope, + merging_scopes, + backend, + turbo_tasks, + ); }) } @@ -1759,7 +1759,7 @@ impl Task { for scope in scopes.iter() { #[cfg(not(feature = "report_expensive"))] { - child.add_to_scope_internal(scope, false, backend, turbo_tasks); + child.add_to_scope_internal(scope, 0, backend, turbo_tasks); } #[cfg(feature = "report_expensive")] { @@ -1768,7 +1768,7 @@ impl Task { use turbo_tasks::util::FormatDuration; let start = Instant::now(); - child.add_to_scope_internal(scope, false, backend, turbo_tasks); + child.add_to_scope_internal(scope, 0, backend, turbo_tasks); let elapsed = start.elapsed(); if elapsed.as_millis() >= 10 { println!( @@ -2474,7 +2474,7 @@ const SPLIT_OFF_QUEUE_AT: usize = 100; pub fn run_add_to_scope_queue( mut queue: VecDeque<(TaskId, usize)>, id: TaskScopeId, - is_optimization_scope: bool, + merging_scopes: usize, backend: &MemoryBackend, turbo_tasks: &dyn TurboTasksBackendApi, ) { @@ -2482,7 +2482,7 @@ pub fn run_add_to_scope_queue( backend.with_task(child, |child| { child.add_to_scope_internal_shallow( id, - is_optimization_scope, + merging_scopes, depth, backend, turbo_tasks, @@ -2491,9 +2491,13 @@ pub fn run_add_to_scope_queue( }); while queue.len() > SPLIT_OFF_QUEUE_AT { let split_off_queue = queue.split_off(queue.len() - SPLIT_OFF_QUEUE_AT); - turbo_tasks.schedule_backend_foreground_job(backend.create_backend_job( - Job::AddToScopeQueue(split_off_queue, id, is_optimization_scope), - )); + turbo_tasks.schedule_backend_foreground_job( + backend.create_backend_job(Job::AddToScopeQueue( + split_off_queue, + id, + merging_scopes, + )), + ); } } } From 711a785bc887b5e6aa9755dbf29217402d6f9113 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 16 Feb 2023 12:50:36 +0100 Subject: [PATCH 02/13] fix incorrect scope removals --- crates/turbo-tasks-memory/src/task.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/turbo-tasks-memory/src/task.rs b/crates/turbo-tasks-memory/src/task.rs index d1e8f9bae70884..4f097e2e183f68 100644 --- a/crates/turbo-tasks-memory/src/task.rs +++ b/crates/turbo-tasks-memory/src/task.rs @@ -1461,11 +1461,11 @@ impl Task { let schedule_self = self.add_self_to_new_scope(&mut state, root_scope, backend, turbo_tasks); - let mut merging_scopes = 0; + let mut merging_scopes = Vec::with_capacity(scopes.len()); // remove self from old scopes for (scope, count) in scopes.iter() { if *count > 0 { - merging_scopes += 1; + merging_scopes.push(*scope); self.remove_self_from_scope_full(&mut state, *scope, backend, turbo_tasks); } } @@ -1480,7 +1480,7 @@ impl Task { backend.with_task(*child, |child| { child.add_to_scope_internal( root_scope, - merging_scopes, + merging_scopes.len(), backend, turbo_tasks, ); @@ -1495,9 +1495,9 @@ impl Task { } // Remove children from old scopes - turbo_tasks.schedule_backend_foreground_job(backend.create_backend_job( - Job::RemoveFromScopes(children, scopes.into_iter().map(|(id, _)| id).collect()), - )); + turbo_tasks.schedule_backend_foreground_job( + backend.create_backend_job(Job::RemoveFromScopes(children, merging_scopes)), + ); None } else { Some(state) From 2374d0c6fa665d009d24760e54bdfd4442e3c3f1 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 16 Feb 2023 13:00:58 +0100 Subject: [PATCH 03/13] remove depth tracking --- .../turbo-tasks-memory/src/memory_backend.rs | 2 +- crates/turbo-tasks-memory/src/task.rs | 45 +++++++++---------- 2 files changed, 21 insertions(+), 26 deletions(-) diff --git a/crates/turbo-tasks-memory/src/memory_backend.rs b/crates/turbo-tasks-memory/src/memory_backend.rs index 36111a464b09e1..4a8a88c896c4d7 100644 --- a/crates/turbo-tasks-memory/src/memory_backend.rs +++ b/crates/turbo-tasks-memory/src/memory_backend.rs @@ -591,7 +591,7 @@ pub(crate) enum Job { ScheduleWhenDirtyFromScope(AutoSet), /// Add tasks from a scope. Scheduled by `run_add_from_scope_queue` to /// split off work. - AddToScopeQueue(VecDeque<(TaskId, usize)>, TaskScopeId, usize), + AddToScopeQueue(VecDeque, TaskScopeId, usize), /// Remove tasks from a scope. Scheduled by `run_remove_from_scope_queue` to /// split off work. RemoveFromScopeQueue(VecDeque, TaskScopeId), diff --git a/crates/turbo-tasks-memory/src/task.rs b/crates/turbo-tasks-memory/src/task.rs index 4f097e2e183f68..c5628b5c2049ab 100644 --- a/crates/turbo-tasks-memory/src/task.rs +++ b/crates/turbo-tasks-memory/src/task.rs @@ -975,10 +975,9 @@ impl Task { &self, id: TaskScopeId, merging_scopes: usize, - depth: usize, backend: &MemoryBackend, turbo_tasks: &dyn TurboTasksBackendApi, - queue: &mut VecDeque<(TaskId, usize)>, + queue: &mut VecDeque, ) { let mut state = self.full_state_mut(); let TaskState { @@ -1019,31 +1018,28 @@ impl Task { return; } - if depth < usize::BITS as usize { - *optimization_counter += 1; - if merging_scopes > 0 { - *optimization_counter = optimization_counter.saturating_sub(merging_scopes); - } else { - const SCOPE_OPTIMIZATION_THRESHOLD: usize = 1024; - if *optimization_counter * children.len() > SCOPE_OPTIMIZATION_THRESHOLD { - list.remove(id); - drop(self.make_root_scoped_internal(state, backend, turbo_tasks)); - return self.add_to_scope_internal_shallow( - id, - merging_scopes, - depth, - backend, - turbo_tasks, - queue, - ); - } + *optimization_counter += 1; + if merging_scopes > 0 { + *optimization_counter = optimization_counter.saturating_sub(merging_scopes); + } else { + const SCOPE_OPTIMIZATION_THRESHOLD: usize = 1024; + if *optimization_counter * children.len() > SCOPE_OPTIMIZATION_THRESHOLD { + list.remove(id); + drop(self.make_root_scoped_internal(state, backend, turbo_tasks)); + return self.add_to_scope_internal_shallow( + id, + merging_scopes, + backend, + turbo_tasks, + queue, + ); } } if queue.capacity() == 0 { queue.reserve(max(children.len(), SPLIT_OFF_QUEUE_AT * 2)); } - queue.extend(children.iter().copied().map(|child| (child, depth + 1))); + queue.extend(children.iter().copied()); // add to dirty list of the scope (potentially schedule) let schedule_self = @@ -1066,7 +1062,7 @@ impl Task { ) { // VecDeque::new() would allocate with 7 items capacity. We don't want that. let mut queue = VecDeque::with_capacity(0); - self.add_to_scope_internal_shallow(id, merging_scopes, 0, backend, turbo_tasks, &mut queue); + self.add_to_scope_internal_shallow(id, merging_scopes, backend, turbo_tasks, &mut queue); run_add_to_scope_queue(queue, id, merging_scopes, backend, turbo_tasks); } @@ -2472,18 +2468,17 @@ const SPLIT_OFF_QUEUE_AT: usize = 100; /// Adds a list of tasks and their children to a scope, recursively. pub fn run_add_to_scope_queue( - mut queue: VecDeque<(TaskId, usize)>, + mut queue: VecDeque, id: TaskScopeId, merging_scopes: usize, backend: &MemoryBackend, turbo_tasks: &dyn TurboTasksBackendApi, ) { - while let Some((child, depth)) = queue.pop_front() { + while let Some(child) = queue.pop_front() { backend.with_task(child, |child| { child.add_to_scope_internal_shallow( id, merging_scopes, - depth, backend, turbo_tasks, &mut queue, From 4ef69091a5495471270130121273c6d357091c76 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 16 Feb 2023 13:43:39 +0100 Subject: [PATCH 04/13] inline with_scope/task --- crates/turbo-tasks-memory/src/memory_backend.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/turbo-tasks-memory/src/memory_backend.rs b/crates/turbo-tasks-memory/src/memory_backend.rs index 4a8a88c896c4d7..0b59f756a0dbb4 100644 --- a/crates/turbo-tasks-memory/src/memory_backend.rs +++ b/crates/turbo-tasks-memory/src/memory_backend.rs @@ -123,10 +123,12 @@ impl MemoryBackend { } } + #[inline(always)] pub fn with_task(&self, id: TaskId, func: impl FnOnce(&Task) -> T) -> T { func(self.memory_tasks.get(*id).unwrap()) } + #[inline(always)] pub fn with_scope(&self, id: TaskScopeId, func: impl FnOnce(&TaskScope) -> T) -> T { func(self.memory_task_scopes.get(*id).unwrap()) } From b9412fe41f8809135a010b4cfb45e1a63084e2b6 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 16 Feb 2023 17:14:21 +0100 Subject: [PATCH 05/13] refactor collectibles reading to use turbo-tasks functions for caching --- .../turbo-tasks-memory/src/count_hash_set.rs | 5 + .../turbo-tasks-memory/src/memory_backend.rs | 138 +++++++++++---- .../src/memory_backend_with_pg.rs | 5 +- crates/turbo-tasks-memory/src/scope.rs | 58 ++----- crates/turbo-tasks-memory/src/stats.rs | 9 +- crates/turbo-tasks-memory/src/task.rs | 161 +++++++++++++----- crates/turbo-tasks-testing/src/lib.rs | 7 +- crates/turbo-tasks/src/backend.rs | 11 +- crates/turbo-tasks/src/collectibles.rs | 22 +-- crates/turbo-tasks/src/manager.rs | 57 +++---- crates/turbo-tasks/src/raw_vc.rs | 20 +-- 11 files changed, 288 insertions(+), 205 deletions(-) diff --git a/crates/turbo-tasks-memory/src/count_hash_set.rs b/crates/turbo-tasks-memory/src/count_hash_set.rs index a805e86191471a..42c4b8ec345bbd 100644 --- a/crates/turbo-tasks-memory/src/count_hash_set.rs +++ b/crates/turbo-tasks-memory/src/count_hash_set.rs @@ -107,6 +107,11 @@ impl CountHashSet { self.add_count(item, 1) } + /// Returns the current count of an item + pub fn get(&self, item: &T) -> isize { + *self.inner.get(item).unwrap_or(&0) + } + /// Returns true when the value is no longer visible from outside pub fn remove_count(&mut self, item: T, count: usize) -> bool { match self.inner.entry(item) { diff --git a/crates/turbo-tasks-memory/src/memory_backend.rs b/crates/turbo-tasks-memory/src/memory_backend.rs index 0b59f756a0dbb4..f1fb541448ff61 100644 --- a/crates/turbo-tasks-memory/src/memory_backend.rs +++ b/crates/turbo-tasks-memory/src/memory_backend.rs @@ -1,10 +1,10 @@ use std::{ - borrow::Cow, + borrow::{Borrow, Cow}, cell::RefCell, cmp::min, collections::VecDeque, future::Future, - hash::BuildHasherDefault, + hash::{BuildHasher, BuildHasherDefault, Hash}, pin::Pin, sync::{ atomic::{AtomicBool, Ordering}, @@ -24,6 +24,7 @@ use turbo_tasks::{ TransientTaskType, }, event::EventListener, + primitives::RawVcSetVc, util::{IdFactory, NoMoveVec}, CellId, RawVc, TaskId, TraitTypeId, TurboTasksBackendApi, }; @@ -48,6 +49,8 @@ pub struct MemoryBackend { backend_jobs: NoMoveVec, backend_job_id_factory: IdFactory, task_cache: DashMap, TaskId, BuildHasherDefault>, + read_collectibles_task_cache: + DashMap<(TaskScopeId, TraitTypeId), TaskId, BuildHasherDefault>, memory_limit: usize, gc_queue: Option, idle_gc_active: AtomicBool, @@ -76,6 +79,7 @@ impl MemoryBackend { backend_jobs: NoMoveVec::new(), backend_job_id_factory: IdFactory::new(), task_cache: DashMap::default(), + read_collectibles_task_cache: DashMap::default(), memory_limit, gc_queue: (memory_limit != usize::MAX).then(GcQueue::new), idle_gc_active: AtomicBool::new(false), @@ -263,6 +267,93 @@ impl MemoryBackend { } } } + + pub(crate) fn get_or_create_read_collectibles_task( + &self, + scope_id: TaskScopeId, + trait_type: TraitTypeId, + parent_task: TaskId, + turbo_tasks: &dyn TurboTasksBackendApi, + ) -> TaskId { + if let Some(task) = self.lookup_and_connect_task( + parent_task, + &self.read_collectibles_task_cache, + &(scope_id, trait_type), + turbo_tasks, + ) { + // fast pass without creating a new task + task + } else { + // slow pass with key lock + let id = turbo_tasks.get_fresh_task_id(); + let task = + Task::new_read_collectibles(id, scope_id, trait_type, turbo_tasks.stats_type()); + // Safety: We have a fresh task id that nobody knows about yet + unsafe { + self.insert_and_connect_fresh_task( + parent_task, + &self.read_collectibles_task_cache, + (scope_id, trait_type), + id, + task, + turbo_tasks, + ) + } + } + } + + /// # Safty + /// + /// `new_id` must be an unused task id + unsafe fn insert_and_connect_fresh_task( + &self, + parent_task: TaskId, + task_cache: &DashMap, + key: K, + new_id: TaskId, + task: Task, + turbo_tasks: &dyn TurboTasksBackendApi, + ) -> TaskId { + // Safety: We have a fresh task id that nobody knows about yet + unsafe { + self.memory_tasks.insert(*new_id, task); + } + let result_task = match task_cache.entry(key) { + Entry::Vacant(entry) => { + // This is the most likely case + entry.insert(new_id); + new_id + } + Entry::Occupied(entry) => { + // Safety: We have a fresh task id that nobody knows about yet + unsafe { + self.memory_tasks.remove(*new_id); + turbo_tasks.reuse_task_id(new_id); + } + *entry.get() + } + }; + self.connect_task_child(parent_task, result_task, turbo_tasks); + result_task + } + + fn lookup_and_connect_task( + &self, + parent_task: TaskId, + task_cache: &DashMap, + key: &Q, + turbo_tasks: &dyn TurboTasksBackendApi, + ) -> Option + where + K: Borrow, + Q: Hash + Eq + ?Sized, + { + task_cache.get(key).map(|task| { + self.connect_task_child(parent_task, *task, turbo_tasks); + + *task + }) + } } impl Backend for MemoryBackend { @@ -447,15 +538,15 @@ impl Backend for MemoryBackend { }) } - fn try_read_task_collectibles( + fn read_task_collectibles( &self, id: TaskId, trait_id: TraitTypeId, reader: TaskId, turbo_tasks: &dyn TurboTasksBackendApi, - ) -> Result, EventListener>> { + ) -> RawVcSetVc { self.with_task(id, |task| { - task.try_read_task_collectibles(reader, trait_id, self, turbo_tasks) + task.read_task_collectibles(reader, trait_id, self, turbo_tasks) }) } @@ -521,12 +612,10 @@ impl Backend for MemoryBackend { parent_task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi, ) -> TaskId { - let result = if let Some(task) = self.task_cache.get(&task_type).map(|task| *task) { + if let Some(task) = + self.lookup_and_connect_task(parent_task, &self.task_cache, &task_type, turbo_tasks) + { // fast pass without creating a new task - self.connect_task_child(parent_task, task, turbo_tasks); - - // TODO maybe force (background) scheduling to avoid inactive tasks hanging in - // "in progress" until they become active task } else { // It's important to avoid overallocating memory as this will go into the task @@ -538,27 +627,16 @@ impl Backend for MemoryBackend { let task = Task::new_persistent(id, task_type.clone(), turbo_tasks.stats_type()); // Safety: We have a fresh task id that nobody knows about yet unsafe { - self.memory_tasks.insert(*id, task); + self.insert_and_connect_fresh_task( + parent_task, + &self.task_cache, + task_type, + id, + task, + turbo_tasks, + ) } - let result_task = match self.task_cache.entry(task_type) { - Entry::Vacant(entry) => { - // This is the most likely case - entry.insert(id); - id - } - Entry::Occupied(entry) => { - // Safety: We have a fresh task id that nobody knows about yet - unsafe { - self.memory_tasks.remove(*id); - turbo_tasks.reuse_task_id(id); - } - *entry.get() - } - }; - self.connect_task_child(parent_task, result_task, turbo_tasks); - result_task - }; - result + } } fn create_transient_task( diff --git a/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs b/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs index 19f0092b29079d..e3b9a47cfdee9a 100644 --- a/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs +++ b/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs @@ -26,6 +26,7 @@ use turbo_tasks::{ ActivateResult, DeactivateResult, PersistResult, PersistTaskState, PersistedGraph, PersistedGraphApi, ReadTaskState, TaskCell, TaskData, }, + primitives::RawVcSetVc, util::{IdFactory, NoMoveVec, SharedError}, CellId, RawVc, TaskId, TraitTypeId, TurboTasksBackendApi, }; @@ -1395,13 +1396,13 @@ impl Backend for MemoryBackendWithPersistedGraph

{ } } - fn try_read_task_collectibles( + fn read_task_collectibles( &self, _task: TaskId, _trait_id: TraitTypeId, _reader: TaskId, _turbo_tasks: &dyn TurboTasksBackendApi, - ) -> Result, EventListener>> { + ) -> RawVcSetVc { todo!() } diff --git a/crates/turbo-tasks-memory/src/scope.rs b/crates/turbo-tasks-memory/src/scope.rs index 65c182a48db921..46f9c1cad28c39 100644 --- a/crates/turbo-tasks-memory/src/scope.rs +++ b/crates/turbo-tasks-memory/src/scope.rs @@ -1,5 +1,4 @@ use std::{ - collections::HashMap, fmt::{Debug, Display}, hash::Hash, mem::take, @@ -173,7 +172,7 @@ impl TaskScope { children: CountHashSet::new(), collectibles: AutoMap::new(), dependent_tasks: AutoSet::new(), - event: Event::new(|| { + event: Event::new(move || { #[cfg(feature = "print_scope_updates")] return format!("TaskScope({id})::event"); #[cfg(not(feature = "print_scope_updates"))] @@ -200,7 +199,7 @@ impl TaskScope { children: CountHashSet::new(), collectibles: AutoMap::new(), dependent_tasks: AutoSet::new(), - event: Event::new(|| { + event: Event::new(move || { #[cfg(feature = "print_scope_updates")] return format!("TaskScope({id})::event"); #[cfg(not(feature = "print_scope_updates"))] @@ -325,38 +324,18 @@ impl TaskScope { } } - pub fn read_collectibles( + pub fn read_collectibles_and_children( &self, self_id: TaskScopeId, trait_id: TraitTypeId, reader: TaskId, - backend: &MemoryBackend, - ) -> AutoSet { - let collectibles = self.read_collectibles_recursive( - self_id, - trait_id, - reader, - backend, - &mut HashMap::with_hasher(BuildNoHashHasher::default()), - ); - AutoSet::from_iter(collectibles.iter().copied()) - } - - fn read_collectibles_recursive( - &self, - self_id: TaskScopeId, - trait_id: TraitTypeId, - reader: TaskId, - backend: &MemoryBackend, - cache: &mut HashMap, BuildNoHashHasher>, - ) -> CountHashSet { - // TODO add reverse edges from task to scopes and (scope, trait_id) + ) -> (CountHashSet, Vec) { let mut state = self.state.lock(); let children = state.children.iter().copied().collect::>(); state.dependent_tasks.insert(reader); Task::add_dependency_to_current(TaskDependency::ScopeChildren(self_id)); - let mut current = { + let current = { let (c, dependent_tasks) = state.collectibles.entry(trait_id).or_default(); dependent_tasks.insert(reader); Task::add_dependency_to_current(TaskDependency::ScopeCollectibles(self_id, trait_id)); @@ -364,22 +343,7 @@ impl TaskScope { }; drop(state); - for id in children { - backend.with_scope(id, |scope| { - let child = if let Some(cached) = cache.get(&id) { - cached - } else { - let child = - scope.read_collectibles_recursive(id, trait_id, reader, backend, cache); - cache.entry(id).or_insert(child) - }; - for v in child.iter() { - current.add(*v); - } - }) - } - - current + (current, children) } pub(crate) fn remove_dependent_task(&self, reader: TaskId) { @@ -681,12 +645,18 @@ impl TaskScopeState { match self.collectibles.entry(trait_id) { Entry::Occupied(mut entry) => { let (collectibles, dependent_tasks) = entry.get_mut(); - if collectibles.remove_count(collectible, count) { + let old_value = collectibles.get(&collectible); + let new_value = old_value - count as isize; + if collectibles.remove_count(collectible, count) || new_value < 0 { let notify = take(dependent_tasks); if collectibles.is_unset() { entry.remove(); } - log_scope_update!("remove_collectible {} -> {}", *self.id, collectible); + log_scope_update!( + "remove_collectible {} -> {} ({old_value} -> {new_value})", + *self.id, + collectible + ); Some(ScopeCollectibleChangeEffect { notify }) } else { None diff --git a/crates/turbo-tasks-memory/src/stats.rs b/crates/turbo-tasks-memory/src/stats.rs index 215d65b5451c64..a62853736b38d0 100644 --- a/crates/turbo-tasks-memory/src/stats.rs +++ b/crates/turbo-tasks-memory/src/stats.rs @@ -23,6 +23,7 @@ pub struct StatsReferences { pub enum StatsTaskType { Root(TaskId), Once(TaskId), + ReadCollectibles(TraitTypeId), Native(FunctionId), ResolveNative(FunctionId), ResolveTrait(TraitTypeId, String), @@ -33,6 +34,9 @@ impl Display for StatsTaskType { match self { StatsTaskType::Root(_) => write!(f, "root"), StatsTaskType::Once(_) => write!(f, "once"), + StatsTaskType::ReadCollectibles(t) => { + write!(f, "read collectibles {}", registry::get_trait(*t).name) + } StatsTaskType::Native(nf) => write!(f, "{}", registry::get_function(*nf).name), StatsTaskType::ResolveNative(nf) => { write!(f, "resolve {}", registry::get_function(*nf).name) @@ -184,7 +188,10 @@ impl Stats { pub fn merge_resolve(&mut self) { self.merge(|ty, _stats| match ty { - StatsTaskType::Root(_) | StatsTaskType::Once(_) | StatsTaskType::Native(_) => false, + StatsTaskType::Root(_) + | StatsTaskType::Once(_) + | StatsTaskType::Native(_) + | StatsTaskType::ReadCollectibles(..) => false, StatsTaskType::ResolveNative(_) | StatsTaskType::ResolveTrait(_, _) => true, }) } diff --git a/crates/turbo-tasks-memory/src/task.rs b/crates/turbo-tasks-memory/src/task.rs index c5628b5c2049ab..24b722d909bda5 100644 --- a/crates/turbo-tasks-memory/src/task.rs +++ b/crates/turbo-tasks-memory/src/task.rs @@ -24,7 +24,9 @@ use tokio::task_local; use turbo_tasks::{ backend::{PersistentTaskType, TaskExecutionSpec}, event::{Event, EventListener}, - get_invalidator, registry, CellId, Invalidator, RawVc, StatsType, TaskId, TraitTypeId, + get_invalidator, + primitives::{RawVcSet, RawVcSetVc}, + registry, CellId, Invalidator, RawVc, StatsType, TaskId, TraitTypeId, TryJoinIterExt, TurboTasksBackendApi, ValueTypeId, }; @@ -80,6 +82,11 @@ enum TaskType { /// applied. Once(OnceTaskFn), + /// A task that reads all collectibles of a certain trait from a + /// [TaskScope]. It will do that by recursively calling ReadCollectibles on + /// child scopes, so that results by scope are cached. + ReadCollectibles(TaskScopeId, TraitTypeId), + /// A normal persistent task Persistent(Arc), } @@ -89,6 +96,11 @@ impl Debug for TaskType { match self { Self::Root(..) => f.debug_tuple("Root").finish(), Self::Once(..) => f.debug_tuple("Once").finish(), + Self::ReadCollectibles(scope_id, trait_id) => f + .debug_tuple("ReadCollectibles") + .field(scope_id) + .field(®istry::get_trait(*trait_id).name) + .finish(), Self::Persistent(ty) => Debug::fmt(ty, f), } } @@ -99,6 +111,7 @@ impl Display for TaskType { match self { Self::Root(..) => f.debug_tuple("Root").finish(), Self::Once(..) => f.debug_tuple("Once").finish(), + Self::ReadCollectibles(..) => f.debug_tuple("ReadCollectibles").finish(), Self::Persistent(ty) => Display::fmt(ty, f), } } @@ -443,9 +456,28 @@ impl Task { } } + pub(crate) fn new_read_collectibles( + id: TaskId, + target_scope: TaskScopeId, + trait_type_id: TraitTypeId, + stats_type: StatsType, + ) -> Self { + let ty = TaskType::ReadCollectibles(target_scope, trait_type_id); + let description = Self::get_event_description_static(id, &ty); + Self { + id, + ty, + state: RwLock::new(TaskMetaState::Full(box TaskState::new( + description, + stats_type, + ))), + } + } + pub(crate) fn is_pure(&self) -> bool { match &self.ty { TaskType::Persistent(_) => true, + TaskType::ReadCollectibles(..) => true, TaskType::Root(_) => false, TaskType::Once(_) => false, } @@ -465,36 +497,18 @@ impl Task { } pub(crate) fn get_description(&self) -> String { - match &self.ty { - TaskType::Root(..) => format!("[{}] root", self.id), - TaskType::Once(..) => format!("[{}] once", self.id), - TaskType::Persistent(ty) => match &**ty { - PersistentTaskType::Native(native_fn, _) => { - format!("[{}] {}", self.id, registry::get_function(*native_fn).name) - } - PersistentTaskType::ResolveNative(native_fn, _) => { - format!( - "[{}] [resolve] {}", - self.id, - registry::get_function(*native_fn).name - ) - } - PersistentTaskType::ResolveTrait(trait_type, fn_name, _) => { - format!( - "[{}] [resolve trait] {} in trait {}", - self.id, - fn_name, - registry::get_trait(*trait_type).name - ) - } - }, - } + Self::format_description(&self.ty, self.id) } fn format_description(ty: &TaskType, id: TaskId) -> String { match ty { TaskType::Root(..) => format!("[{}] root", id), TaskType::Once(..) => format!("[{}] once", id), + TaskType::ReadCollectibles(_, trait_type_id) => format!( + "[{}] read collectibles({})", + id, + registry::get_trait(*trait_type_id).name + ), TaskType::Persistent(ty) => match &**ty { PersistentTaskType::Native(native_fn, _) => { format!("[{}] {}", id, registry::get_function(*native_fn).name) @@ -617,7 +631,7 @@ impl Task { if !self.try_start_execution(&mut state, turbo_tasks, backend) { return None; } - let future = self.make_execution_future(&mut state, turbo_tasks); + let future = self.make_execution_future(state, backend, turbo_tasks); Some(TaskExecutionSpec { future }) } @@ -672,24 +686,44 @@ impl Task { /// Prepares task execution and returns a future that will execute the task. fn make_execution_future( self: &Task, - mut state: &mut TaskState, + mut state: FullTaskWriteGuard<'_>, + backend: &MemoryBackend, turbo_tasks: &dyn TurboTasksBackendApi, ) -> Pin> + Send>> { match &self.ty { - TaskType::Root(bound_fn) => bound_fn(), - TaskType::Once(mutex) => mutex.lock().take().expect("Task can only be executed once"), + TaskType::Root(bound_fn) => { + drop(state); + bound_fn() + } + TaskType::Once(mutex) => { + drop(state); + mutex.lock().take().expect("Task can only be executed once") + } + &TaskType::ReadCollectibles(scope_id, trait_type_id) => { + drop(state); + Task::make_read_collectibles_execution_future( + self.id, + scope_id, + trait_type_id, + backend, + turbo_tasks, + ) + } TaskType::Persistent(ty) => match &**ty { PersistentTaskType::Native(native_fn, inputs) => { - if let PrepareTaskType::Native(bound_fn) = &state.prepared_type { + let future = if let PrepareTaskType::Native(bound_fn) = &state.prepared_type { bound_fn() } else { let bound_fn = registry::get_function(*native_fn).bind(inputs); let future = bound_fn(); state.prepared_type = PrepareTaskType::Native(bound_fn); future - } + }; + drop(state); + future } PersistentTaskType::ResolveNative(ref native_fn, inputs) => { + drop(state); let native_fn = *native_fn; let inputs = inputs.clone(); let turbo_tasks = turbo_tasks.pin(); @@ -700,6 +734,7 @@ impl Task { )) } PersistentTaskType::ResolveTrait(trait_type, name, inputs) => { + drop(state); let trait_type = *trait_type; let name = name.clone(); let inputs = inputs.clone(); @@ -1662,6 +1697,9 @@ impl Task { match &self.ty { TaskType::Root(_) => StatsTaskType::Root(self.id), TaskType::Once(_) => StatsTaskType::Once(self.id), + TaskType::ReadCollectibles(_, trait_type_id) => { + StatsTaskType::ReadCollectibles(*trait_type_id) + } TaskType::Persistent(ty) => match &**ty { PersistentTaskType::Native(f, _) => StatsTaskType::Native(*f), PersistentTaskType::ResolveNative(f, _) => StatsTaskType::ResolveNative(*f), @@ -1876,29 +1914,60 @@ impl Task { } } - pub(crate) fn try_read_task_collectibles( + fn make_read_collectibles_execution_future( + task_id: TaskId, + scope_id: TaskScopeId, + trait_type_id: TraitTypeId, + backend: &MemoryBackend, + turbo_tasks: &dyn TurboTasksBackendApi, + ) -> Pin> + Send>> { + let (mut current, children) = backend.with_scope(scope_id, |scope| { + scope.read_collectibles_and_children(scope_id, trait_type_id, task_id) + }); + log_scope_update!("reading collectibles from {scope_id}: {:?}", current); + let children = children + .into_iter() + .map(|child| { + let task = backend.get_or_create_read_collectibles_task( + child, + trait_type_id, + task_id, + &*turbo_tasks, + ); + // Safety: RawVcSet is a transparent value + unsafe { + RawVc::TaskOutput(task).into_transparent_read::>() + } + }) + .collect::>(); + Box::pin(async move { + let children = children.into_iter().try_join().await?; + for child in children { + for v in child.iter() { + current.add(*v); + } + } + Ok(RawVcSetVc::cell(current.iter().copied().collect()).into()) + }) + } + + pub(crate) fn read_task_collectibles( &self, reader: TaskId, trait_id: TraitTypeId, backend: &MemoryBackend, turbo_tasks: &dyn TurboTasksBackendApi, - ) -> Result, EventListener>> { + ) -> RawVcSetVc { let mut state = self.full_state_mut(); state = self.ensure_root_scoped(state, backend, turbo_tasks); - // We need to wait for all foreground jobs to be finished as there could be - // ongoing add_to_scope jobs that need to be finished before reading - // from scopes - if let Err(listener) = turbo_tasks.try_foreground_done() { - return Ok(Err(listener)); - } if let TaskScopes::Root(scope_id) = state.scopes { - backend.with_scope(scope_id, |scope| { - if let Some(l) = scope.has_unfinished_tasks() { - return Ok(Err(l)); - } - let set = scope.read_collectibles(scope_id, trait_id, reader, backend); - Ok(Ok(set)) - }) + let task = backend.get_or_create_read_collectibles_task( + scope_id, + trait_id, + reader, + turbo_tasks, + ); + RawVc::TaskOutput(task).into() } else { panic!("It's not possible to read collectibles from a non-root scope") } diff --git a/crates/turbo-tasks-testing/src/lib.rs b/crates/turbo-tasks-testing/src/lib.rs index 526bca03adc3a9..0bcd8788d78aee 100644 --- a/crates/turbo-tasks-testing/src/lib.rs +++ b/crates/turbo-tasks-testing/src/lib.rs @@ -18,6 +18,7 @@ use auto_hash_map::AutoSet; use turbo_tasks::{ backend::CellContent, event::{Event, EventListener}, + primitives::RawVcSetVc, registry, test_helpers::{current_task_for_testing, with_turbo_tasks_for_testing}, CellId, RawVc, TaskId, TraitTypeId, TurboTasksApi, TurboTasksCallApi, @@ -179,11 +180,7 @@ impl TurboTasksApi for VcStorage { unimplemented!() } - fn try_read_task_collectibles( - &self, - _task: TaskId, - _trait_id: TraitTypeId, - ) -> Result, EventListener>> { + fn read_task_collectibles(&self, _task: TaskId, _trait_id: TraitTypeId) -> RawVcSetVc { unimplemented!() } diff --git a/crates/turbo-tasks/src/backend.rs b/crates/turbo-tasks/src/backend.rs index 2f4ab3b4dd0567..ecd039b8586269 100644 --- a/crates/turbo-tasks/src/backend.rs +++ b/crates/turbo-tasks/src/backend.rs @@ -10,14 +10,13 @@ use std::{ }; use anyhow::{anyhow, Result}; -use auto_hash_map::AutoSet; use serde::{Deserialize, Serialize}; pub use crate::id::BackendJobId; use crate::{ - event::EventListener, manager::TurboTasksBackendApi, raw_vc::CellId, registry, - task_input::SharedReference, FunctionId, RawVc, ReadRef, TaskId, TaskIdProvider, TaskInput, - TraitTypeId, + event::EventListener, manager::TurboTasksBackendApi, primitives::RawVcSetVc, raw_vc::CellId, + registry, task_input::SharedReference, FunctionId, RawVc, ReadRef, TaskId, TaskIdProvider, + TaskInput, TraitTypeId, }; /// Different Task types @@ -276,13 +275,13 @@ pub trait Backend: Sync + Send { } } - fn try_read_task_collectibles( + fn read_task_collectibles( &self, task: TaskId, trait_id: TraitTypeId, reader: TaskId, turbo_tasks: &dyn TurboTasksBackendApi, - ) -> Result, EventListener>>; + ) -> RawVcSetVc; fn emit_collectible( &self, diff --git a/crates/turbo-tasks/src/collectibles.rs b/crates/turbo-tasks/src/collectibles.rs index b8fcc1f259c033..841ab151061a26 100644 --- a/crates/turbo-tasks/src/collectibles.rs +++ b/crates/turbo-tasks/src/collectibles.rs @@ -1,24 +1,4 @@ -use anyhow::{anyhow, Result}; - -use crate::{ - self as turbo_tasks, manager::read_task_collectibles, primitives::RawVcSetVc, - CollectiblesFuture, RawVc, TraitTypeId, -}; - -#[turbo_tasks::function] -#[allow(dead_code)] // It's used indirectly -pub async fn read_collectibles(raw: RawVc, trait_type: usize) -> Result { - if let RawVc::TaskOutput(task) = raw { - let tt = crate::turbo_tasks(); - let set = read_task_collectibles(&*tt, task, TraitTypeId::from(trait_type)).await?; - Ok(RawVcSetVc::cell(set)) - } else { - Err(anyhow!( - "peek/take_collectibles was called on Vc that points to a cell (instead of a Vc that \ - points to a task output)" - )) - } -} +use crate::{self as turbo_tasks, CollectiblesFuture}; pub trait CollectiblesSource { fn take_collectibles(self) -> CollectiblesFuture; diff --git a/crates/turbo-tasks/src/manager.rs b/crates/turbo-tasks/src/manager.rs index 0ffcbe3f0e3f3c..d96bb0740d312a 100644 --- a/crates/turbo-tasks/src/manager.rs +++ b/crates/turbo-tasks/src/manager.rs @@ -27,6 +27,7 @@ use crate::{ event::{Event, EventListener}, id::{BackendJobId, FunctionId, TraitTypeId}, id_factory::IdFactory, + primitives::RawVcSetVc, raw_vc::{CellId, RawVc}, registry, task_input::{SharedReference, TaskInput}, @@ -91,11 +92,7 @@ pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send { index: CellId, ) -> Result>; - fn try_read_task_collectibles( - &self, - task: TaskId, - trait_id: TraitTypeId, - ) -> Result, EventListener>>; + fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> RawVcSetVc; fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc); fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc); @@ -400,18 +397,25 @@ impl TurboTasks { if this.stopped.load(Ordering::Acquire) { return false; } - if let Some(execution) = this.backend.try_start_task_execution(task_id, &*this) - { - // Setup thread locals - let (result, duration, instant) = CELL_COUNTERS - .scope(Default::default(), async { - let (result, duration, instant) = TimedFuture::new( - AssertUnwindSafe(execution.future).catch_unwind(), + + // Setup thread locals + if let Some((result, duration, instant)) = CELL_COUNTERS + .scope(Default::default(), async { + if let Some(execution) = + this.backend.try_start_task_execution(task_id, &*this) + { + Some( + TimedFuture::new( + AssertUnwindSafe(execution.future).catch_unwind(), + ) + .await, ) - .await; - (result, duration, instant) - }) - .await; + } else { + None + } + }) + .await + { if cfg!(feature = "log_function_stats") && duration.as_millis() > 1000 { println!( "{} took {}", @@ -811,12 +815,8 @@ impl TurboTasksApi for TurboTasks { .try_read_own_task_cell_untracked(current_task, index, self) } - fn try_read_task_collectibles( - &self, - task: TaskId, - trait_id: TraitTypeId, - ) -> Result, EventListener>> { - self.backend.try_read_task_collectibles( + fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> RawVcSetVc { + self.backend.read_task_collectibles( task, trait_id, current_task("reading collectibles"), @@ -1235,19 +1235,6 @@ pub(crate) async fn read_task_cell_untracked( } } -pub(crate) async fn read_task_collectibles( - this: &dyn TurboTasksApi, - id: TaskId, - trait_id: TraitTypeId, -) -> Result> { - loop { - match this.try_read_task_collectibles(id, trait_id)? { - Ok(result) => return Ok(result), - Err(listener) => listener.await, - } - } -} - pub struct CurrentCellRef { current_task: TaskId, index: CellId, diff --git a/crates/turbo-tasks/src/raw_vc.rs b/crates/turbo-tasks/src/raw_vc.rs index 192258cb9ef8d4..509955726d7cbf 100644 --- a/crates/turbo-tasks/src/raw_vc.rs +++ b/crates/turbo-tasks/src/raw_vc.rs @@ -1,7 +1,7 @@ use std::{ any::Any, fmt::{Debug, Display}, - future::{Future, IntoFuture}, + future::Future, hash::Hash, marker::PhantomData, pin::Pin, @@ -335,15 +335,10 @@ impl CollectiblesSource for RawVc { fn peek_collectibles(self) -> CollectiblesFuture { let tt = turbo_tasks(); tt.notify_scheduled_tasks(); - let set: RawVcSetVc = tt - .native_call( - *crate::collectibles::READ_COLLECTIBLES_FUNCTION_ID, - vec![self.into(), (*T::get_trait_type_id()).into()], - ) - .into(); + let set: RawVcSetVc = tt.read_task_collectibles(self.get_task_id(), T::get_trait_type_id()); CollectiblesFuture { turbo_tasks: tt, - inner: set.into_future(), + inner: set.strongly_consistent(), take: false, phantom: PhantomData, } @@ -352,15 +347,10 @@ impl CollectiblesSource for RawVc { fn take_collectibles(self) -> CollectiblesFuture { let tt = turbo_tasks(); tt.notify_scheduled_tasks(); - let set: RawVcSetVc = tt - .native_call( - *crate::collectibles::READ_COLLECTIBLES_FUNCTION_ID, - vec![self.into(), (*T::get_trait_type_id()).into()], - ) - .into(); + let set: RawVcSetVc = tt.read_task_collectibles(self.get_task_id(), T::get_trait_type_id()); CollectiblesFuture { turbo_tasks: tt, - inner: set.into_future(), + inner: set.strongly_consistent(), take: true, phantom: PhantomData, } From 6aa80d9abbb2fbe1e5c7dadc7d45b13e2871f70c Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 16 Feb 2023 22:19:59 +0100 Subject: [PATCH 06/13] avoid stringifying every task --- crates/turbo-tasks-memory/src/task.rs | 34 ++++++++++++++++++++------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/crates/turbo-tasks-memory/src/task.rs b/crates/turbo-tasks-memory/src/task.rs index 24b722d909bda5..d73e0956f1caf6 100644 --- a/crates/turbo-tasks-memory/src/task.rs +++ b/crates/turbo-tasks-memory/src/task.rs @@ -91,6 +91,24 @@ enum TaskType { Persistent(Arc), } +enum TaskTypeForDescription { + Root, + Once, + ReadCollectibles(TraitTypeId), + Persistent(Arc), +} + +impl TaskTypeForDescription { + fn from(task_type: &TaskType) -> Self { + match task_type { + TaskType::Root(..) => Self::Root, + TaskType::Once(..) => Self::Once, + TaskType::ReadCollectibles(.., trait_id) => Self::ReadCollectibles(*trait_id), + TaskType::Persistent(ty) => Self::Persistent(ty.clone()), + } + } +} + impl Debug for TaskType { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { @@ -497,19 +515,19 @@ impl Task { } pub(crate) fn get_description(&self) -> String { - Self::format_description(&self.ty, self.id) + Self::format_description(&TaskTypeForDescription::from(&self.ty), self.id) } - fn format_description(ty: &TaskType, id: TaskId) -> String { + fn format_description(ty: &TaskTypeForDescription, id: TaskId) -> String { match ty { - TaskType::Root(..) => format!("[{}] root", id), - TaskType::Once(..) => format!("[{}] once", id), - TaskType::ReadCollectibles(_, trait_type_id) => format!( + TaskTypeForDescription::Root => format!("[{}] root", id), + TaskTypeForDescription::Once => format!("[{}] once", id), + TaskTypeForDescription::ReadCollectibles(trait_type_id) => format!( "[{}] read collectibles({})", id, registry::get_trait(*trait_type_id).name ), - TaskType::Persistent(ty) => match &**ty { + TaskTypeForDescription::Persistent(ty) => match &**ty { PersistentTaskType::Native(native_fn, _) => { format!("[{}] {}", id, registry::get_function(*native_fn).name) } @@ -536,8 +554,8 @@ impl Task { id: TaskId, ty: &TaskType, ) -> impl Fn() -> String + Send + Sync { - let ty = Self::format_description(ty, id); - move || ty.clone() + let ty = TaskTypeForDescription::from(ty); + move || Self::format_description(&ty, id) } fn get_event_description(&self) -> impl Fn() -> String + Send + Sync { From 9df87fc792db680bd53ab3800d98d8f4974da29b Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Fri, 17 Feb 2023 00:52:42 +0100 Subject: [PATCH 07/13] read collectibles task starts root scoped --- crates/turbo-tasks-memory/src/memory_backend.rs | 11 ++++++++--- crates/turbo-tasks-memory/src/task.rs | 11 +++++++++++ 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/crates/turbo-tasks-memory/src/memory_backend.rs b/crates/turbo-tasks-memory/src/memory_backend.rs index f1fb541448ff61..4eed62373dd147 100644 --- a/crates/turbo-tasks-memory/src/memory_backend.rs +++ b/crates/turbo-tasks-memory/src/memory_backend.rs @@ -273,6 +273,7 @@ impl MemoryBackend { scope_id: TaskScopeId, trait_type: TraitTypeId, parent_task: TaskId, + root_scoped: bool, turbo_tasks: &dyn TurboTasksBackendApi, ) -> TaskId { if let Some(task) = self.lookup_and_connect_task( @@ -296,6 +297,7 @@ impl MemoryBackend { (scope_id, trait_type), id, task, + root_scoped, turbo_tasks, ) } @@ -312,11 +314,13 @@ impl MemoryBackend { key: K, new_id: TaskId, task: Task, + root_scoped: bool, turbo_tasks: &dyn TurboTasksBackendApi, ) -> TaskId { // Safety: We have a fresh task id that nobody knows about yet - unsafe { - self.memory_tasks.insert(*new_id, task); + let task = unsafe { self.memory_tasks.insert(*new_id, task) }; + if root_scoped { + task.make_root_scoped(self, turbo_tasks); } let result_task = match task_cache.entry(key) { Entry::Vacant(entry) => { @@ -633,6 +637,7 @@ impl Backend for MemoryBackend { task_type, id, task, + false, turbo_tasks, ) } @@ -698,7 +703,7 @@ impl Job { Job::RemoveFromScopes(tasks, scopes) => { for task in tasks { backend.with_task(task, |task| { - task.remove_from_scopes(scopes.iter().cloned(), backend, turbo_tasks) + task.remove_from_scopes(scopes.iter().copied(), backend, turbo_tasks) }); } backend.scope_add_remove_priority.finish_high(); diff --git a/crates/turbo-tasks-memory/src/task.rs b/crates/turbo-tasks-memory/src/task.rs index d73e0956f1caf6..83b7025eb207e2 100644 --- a/crates/turbo-tasks-memory/src/task.rs +++ b/crates/turbo-tasks-memory/src/task.rs @@ -1410,6 +1410,15 @@ impl Task { } } + pub fn make_root_scoped( + &self, + backend: &MemoryBackend, + turbo_tasks: &dyn TurboTasksBackendApi, + ) { + let state = self.full_state_mut(); + self.make_root_scoped_internal(state, backend, turbo_tasks); + } + fn make_root_scoped_internal<'a>( &self, mut state: FullTaskWriteGuard<'a>, @@ -1950,6 +1959,7 @@ impl Task { child, trait_type_id, task_id, + false, &*turbo_tasks, ); // Safety: RawVcSet is a transparent value @@ -1983,6 +1993,7 @@ impl Task { scope_id, trait_id, reader, + true, turbo_tasks, ); RawVc::TaskOutput(task).into() From 9fc93f54ea634b88f1913244117ecca2c6a3d2d0 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Fri, 17 Feb 2023 00:58:06 +0100 Subject: [PATCH 08/13] use read collectibles task in table view --- crates/turbo-tasks-memory/src/memory_backend.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/crates/turbo-tasks-memory/src/memory_backend.rs b/crates/turbo-tasks-memory/src/memory_backend.rs index 4eed62373dd147..faa2999344a860 100644 --- a/crates/turbo-tasks-memory/src/memory_backend.rs +++ b/crates/turbo-tasks-memory/src/memory_backend.rs @@ -125,6 +125,14 @@ impl MemoryBackend { for id in self.task_cache.clone().into_read_only().values() { func(*id); } + for id in self + .read_collectibles_task_cache + .clone() + .into_read_only() + .values() + { + func(*id); + } } #[inline(always)] From c8517476833a606b47bbd493700cd9e2db0b8426 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Fri, 17 Feb 2023 00:58:30 +0100 Subject: [PATCH 09/13] optimize scopes when adding children too --- crates/turbo-tasks-memory/src/task.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/crates/turbo-tasks-memory/src/task.rs b/crates/turbo-tasks-memory/src/task.rs index 83b7025eb207e2..e7385a8e9c968b 100644 --- a/crates/turbo-tasks-memory/src/task.rs +++ b/crates/turbo-tasks-memory/src/task.rs @@ -418,6 +418,8 @@ use self::meta_state::{ FullTaskWriteGuard, TaskMetaState, TaskMetaStateReadGuard, TaskMetaStateWriteGuard, }; +const SCOPE_OPTIMIZATION_THRESHOLD: usize = 255; + impl Task { pub(crate) fn new_persistent( id: TaskId, @@ -1075,7 +1077,6 @@ impl Task { if merging_scopes > 0 { *optimization_counter = optimization_counter.saturating_sub(merging_scopes); } else { - const SCOPE_OPTIMIZATION_THRESHOLD: usize = 1024; if *optimization_counter * children.len() > SCOPE_OPTIMIZATION_THRESHOLD { list.remove(id); drop(self.make_root_scoped_internal(state, backend, turbo_tasks)); @@ -1813,6 +1814,13 @@ impl Task { ) { let mut state = self.full_state_mut(); if state.children.insert(child_id) { + if let TaskScopes::Inner(_, optimization_counter) = &state.scopes { + if *optimization_counter * state.children.len() > SCOPE_OPTIMIZATION_THRESHOLD { + state.children.remove(&child_id); + drop(self.make_root_scoped_internal(state, backend, turbo_tasks)); + return self.connect_child(child_id, backend, turbo_tasks); + } + } let scopes = state.scopes.clone(); drop(state); From 538018ac6e2c2816e21a7035208bbcc3a085ae9c Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Fri, 17 Feb 2023 01:08:03 +0100 Subject: [PATCH 10/13] add inline_add/remove_to/from_scope features --- crates/turbo-tasks-memory/Cargo.toml | 2 ++ crates/turbo-tasks-memory/src/task.rs | 13 +++++++++++++ 2 files changed, 15 insertions(+) diff --git a/crates/turbo-tasks-memory/Cargo.toml b/crates/turbo-tasks-memory/Cargo.toml index 00710f11b40924..ab918a4e4dce8b 100644 --- a/crates/turbo-tasks-memory/Cargo.toml +++ b/crates/turbo-tasks-memory/Cargo.toml @@ -44,6 +44,8 @@ log_connect_tasks = [] report_expensive = [] print_scope_updates = [] print_task_invalidation = [] +inline_add_to_scope = [] +inline_remove_from_scope = [] [[bench]] name = "mod" diff --git a/crates/turbo-tasks-memory/src/task.rs b/crates/turbo-tasks-memory/src/task.rs index e7385a8e9c968b..d2501200bb4ac5 100644 --- a/crates/turbo-tasks-memory/src/task.rs +++ b/crates/turbo-tasks-memory/src/task.rs @@ -1554,6 +1554,17 @@ impl Task { } // Remove children from old scopes + #[cfg(feature = "inline_remove_from_scope")] + for task in children { + backend.with_task(task, |task| { + task.remove_from_scopes( + merging_scopes.iter().copied(), + backend, + turbo_tasks, + ) + }); + } + #[cfg(not(feature = "inline_remove_from_scope"))] turbo_tasks.schedule_backend_foreground_job( backend.create_backend_job(Job::RemoveFromScopes(children, merging_scopes)), ); @@ -2590,6 +2601,7 @@ pub fn run_add_to_scope_queue( &mut queue, ); }); + #[cfg(not(feature = "inline_add_to_scope"))] while queue.len() > SPLIT_OFF_QUEUE_AT { let split_off_queue = queue.split_off(queue.len() - SPLIT_OFF_QUEUE_AT); turbo_tasks.schedule_backend_foreground_job( @@ -2614,6 +2626,7 @@ pub fn run_remove_from_scope_queue( backend.with_task(child, |child| { child.remove_from_scope_internal_shallow(id, backend, turbo_tasks, &mut queue); }); + #[cfg(not(feature = "inline_remove_from_scope"))] while queue.len() > SPLIT_OFF_QUEUE_AT { let split_off_queue = queue.split_off(queue.len() - SPLIT_OFF_QUEUE_AT); From f6636a07d96ecf0af2636f2963a67de05361c58a Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Fri, 17 Feb 2023 01:07:32 +0100 Subject: [PATCH 11/13] reduce function count --- crates/next-core/src/app_source.rs | 6 ++++++ crates/next-core/src/page_source.rs | 16 ++++++++++------ crates/next-dev/src/lib.rs | 23 +++++++++++++---------- 3 files changed, 29 insertions(+), 16 deletions(-) diff --git a/crates/next-core/src/app_source.rs b/crates/next-core/src/app_source.rs index a2ae2a7f0df2d3..26fc0bb61e08da 100644 --- a/crates/next-core/src/app_source.rs +++ b/crates/next-core/src/app_source.rs @@ -505,6 +505,12 @@ async fn create_app_source_for_directory( ); } + let sources = sources + .into_iter() + .map(|source| source.resolve()) + .try_join() + .await?; + Ok(CombinedContentSource { sources }.cell()) } diff --git a/crates/next-core/src/page_source.rs b/crates/next-core/src/page_source.rs index 7cdaf13996e6d6..20076a5f3396d7 100644 --- a/crates/next-core/src/page_source.rs +++ b/crates/next-core/src/page_source.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; use turbo_tasks::{ primitives::{BoolVc, StringVc, StringsVc}, trace::TraceRawVcs, - Value, + TryJoinIterExt, Value, }; use turbo_tasks_env::ProcessEnvVc; use turbo_tasks_fs::{ @@ -287,10 +287,10 @@ pub async fn create_page_source( Ok(CombinedContentSource { sources: vec![ // Match _next/404 first to ensure rewrites work properly. - force_not_found_source, - page_source.into(), - fallback_source.into(), - fallback_not_found_source, + force_not_found_source.resolve().await?, + page_source.as_content_source().resolve().await?, + fallback_source.as_content_source().resolve().await?, + fallback_not_found_source.resolve().await?, ], } .cell() @@ -656,7 +656,11 @@ async fn create_page_source_for_directory( sources.sort_by_key(|(k, _)| *k); Ok(CombinedContentSource { - sources: sources.into_iter().map(|(_, v)| v).collect(), + sources: sources + .into_iter() + .map(|(_, v)| v.resolve()) + .try_join() + .await?, } .cell()) } diff --git a/crates/next-dev/src/lib.rs b/crates/next-dev/src/lib.rs index d35383c03a866f..dcd825b4d032b7 100644 --- a/crates/next-dev/src/lib.rs +++ b/crates/next-dev/src/lib.rs @@ -351,7 +351,7 @@ async fn source( turbo_tasks: turbo_tasks.into(), } .cell() - .into(); + .as_content_source(); let static_source = StaticAssetsContentSourceVc::new(String::new(), project_path.join("public")).into(); let manifest_source = DevManifestContentSource { @@ -371,28 +371,31 @@ async fn source( roots: HashSet::from([main_source.into()]), } .cell() - .into(); + .as_content_source(); let main_source = main_source.into(); - let source_maps = SourceMapContentSourceVc::new(main_source).into(); - let source_map_trace = NextSourceMapTraceContentSourceVc::new(main_source).into(); + let source_maps = SourceMapContentSourceVc::new(main_source).as_content_source(); + let source_map_trace = NextSourceMapTraceContentSourceVc::new(main_source).as_content_source(); let img_source = NextImageContentSourceVc::new( CombinedContentSourceVc::new(vec![static_source, page_source]).into(), ) - .into(); + .as_content_source(); let router_source = NextRouterContentSourceVc::new(main_source, execution_context, next_config, server_addr) .into(); let source = RouterContentSource { routes: vec![ - ("__turbopack__/".to_string(), introspect), - ("__turbo_tasks__/".to_string(), viz), + ("__turbopack__/".to_string(), introspect.resolve().await?), + ("__turbo_tasks__/".to_string(), viz.resolve().await?), ( "__nextjs_original-stack-frame".to_string(), - source_map_trace, + source_map_trace.resolve().await?, ), // TODO: Load path from next.config.js - ("_next/image".to_string(), img_source), - ("__turbopack_sourcemap__/".to_string(), source_maps), + ("_next/image".to_string(), img_source.resolve().await?), + ( + "__turbopack_sourcemap__/".to_string(), + source_maps.resolve().await?, + ), ], fallback: router_source, } From 40e8a7d0ebe8bd3dc853d22685dd38b2057bf369 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Fri, 17 Feb 2023 10:47:53 +0100 Subject: [PATCH 12/13] Revert "reduce function count" This reverts commit f6636a07d96ecf0af2636f2963a67de05361c58a. --- crates/next-core/src/app_source.rs | 6 ------ crates/next-core/src/page_source.rs | 16 ++++++---------- crates/next-dev/src/lib.rs | 23 ++++++++++------------- 3 files changed, 16 insertions(+), 29 deletions(-) diff --git a/crates/next-core/src/app_source.rs b/crates/next-core/src/app_source.rs index 26fc0bb61e08da..a2ae2a7f0df2d3 100644 --- a/crates/next-core/src/app_source.rs +++ b/crates/next-core/src/app_source.rs @@ -505,12 +505,6 @@ async fn create_app_source_for_directory( ); } - let sources = sources - .into_iter() - .map(|source| source.resolve()) - .try_join() - .await?; - Ok(CombinedContentSource { sources }.cell()) } diff --git a/crates/next-core/src/page_source.rs b/crates/next-core/src/page_source.rs index 20076a5f3396d7..7cdaf13996e6d6 100644 --- a/crates/next-core/src/page_source.rs +++ b/crates/next-core/src/page_source.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; use turbo_tasks::{ primitives::{BoolVc, StringVc, StringsVc}, trace::TraceRawVcs, - TryJoinIterExt, Value, + Value, }; use turbo_tasks_env::ProcessEnvVc; use turbo_tasks_fs::{ @@ -287,10 +287,10 @@ pub async fn create_page_source( Ok(CombinedContentSource { sources: vec![ // Match _next/404 first to ensure rewrites work properly. - force_not_found_source.resolve().await?, - page_source.as_content_source().resolve().await?, - fallback_source.as_content_source().resolve().await?, - fallback_not_found_source.resolve().await?, + force_not_found_source, + page_source.into(), + fallback_source.into(), + fallback_not_found_source, ], } .cell() @@ -656,11 +656,7 @@ async fn create_page_source_for_directory( sources.sort_by_key(|(k, _)| *k); Ok(CombinedContentSource { - sources: sources - .into_iter() - .map(|(_, v)| v.resolve()) - .try_join() - .await?, + sources: sources.into_iter().map(|(_, v)| v).collect(), } .cell()) } diff --git a/crates/next-dev/src/lib.rs b/crates/next-dev/src/lib.rs index dcd825b4d032b7..d35383c03a866f 100644 --- a/crates/next-dev/src/lib.rs +++ b/crates/next-dev/src/lib.rs @@ -351,7 +351,7 @@ async fn source( turbo_tasks: turbo_tasks.into(), } .cell() - .as_content_source(); + .into(); let static_source = StaticAssetsContentSourceVc::new(String::new(), project_path.join("public")).into(); let manifest_source = DevManifestContentSource { @@ -371,31 +371,28 @@ async fn source( roots: HashSet::from([main_source.into()]), } .cell() - .as_content_source(); + .into(); let main_source = main_source.into(); - let source_maps = SourceMapContentSourceVc::new(main_source).as_content_source(); - let source_map_trace = NextSourceMapTraceContentSourceVc::new(main_source).as_content_source(); + let source_maps = SourceMapContentSourceVc::new(main_source).into(); + let source_map_trace = NextSourceMapTraceContentSourceVc::new(main_source).into(); let img_source = NextImageContentSourceVc::new( CombinedContentSourceVc::new(vec![static_source, page_source]).into(), ) - .as_content_source(); + .into(); let router_source = NextRouterContentSourceVc::new(main_source, execution_context, next_config, server_addr) .into(); let source = RouterContentSource { routes: vec![ - ("__turbopack__/".to_string(), introspect.resolve().await?), - ("__turbo_tasks__/".to_string(), viz.resolve().await?), + ("__turbopack__/".to_string(), introspect), + ("__turbo_tasks__/".to_string(), viz), ( "__nextjs_original-stack-frame".to_string(), - source_map_trace.resolve().await?, + source_map_trace, ), // TODO: Load path from next.config.js - ("_next/image".to_string(), img_source.resolve().await?), - ( - "__turbopack_sourcemap__/".to_string(), - source_maps.resolve().await?, - ), + ("_next/image".to_string(), img_source), + ("__turbopack_sourcemap__/".to_string(), source_maps), ], fallback: router_source, } From 4494954d358cb1cc47badbe503e35458bfa6ea87 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Fri, 17 Feb 2023 10:52:22 +0100 Subject: [PATCH 13/13] reduce ContentSource::get resolve tasks --- crates/turbopack-dev-server/src/source/combined.rs | 8 ++++++-- crates/turbopack-dev-server/src/source/resolve.rs | 2 +- crates/turbopack-dev-server/src/source/router.rs | 8 ++++++-- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/crates/turbopack-dev-server/src/source/combined.rs b/crates/turbopack-dev-server/src/source/combined.rs index 300fc76f40af72..801e02cb5ed772 100644 --- a/crates/turbopack-dev-server/src/source/combined.rs +++ b/crates/turbopack-dev-server/src/source/combined.rs @@ -102,8 +102,12 @@ impl PausableCombinedContentSource { // we've skipped to exactly the source which requested data. Requery the source // with it's partially computed path and needed data. let result = match pending.take() { - Some(pending) => pending.source.get(&pending.path, mem::take(&mut data)), - None => source.get(path, Default::default()), + Some(pending) => pending + .source + .resolve() + .await? + .get(&pending.path, mem::take(&mut data)), + None => source.resolve().await?.get(path, Default::default()), }; let res = result.await?; diff --git a/crates/turbopack-dev-server/src/source/resolve.rs b/crates/turbopack-dev-server/src/source/resolve.rs index 96e827842382a0..9dc0e241350f24 100644 --- a/crates/turbopack-dev-server/src/source/resolve.rs +++ b/crates/turbopack-dev-server/src/source/resolve.rs @@ -77,7 +77,7 @@ pub async fn resolve_source_request( let new_asset_path = urlencoding::decode(&new_uri.path()[1..])?.into_owned(); - current_source = new_source; + current_source = new_source.resolve().await?; request_overwrites.uri = new_uri; current_asset_path = new_asset_path; data = ContentSourceData::default(); diff --git a/crates/turbopack-dev-server/src/source/router.rs b/crates/turbopack-dev-server/src/source/router.rs index 3b9e8a49ad1a3b..c3f7e5f0bbe8e2 100644 --- a/crates/turbopack-dev-server/src/source/router.rs +++ b/crates/turbopack-dev-server/src/source/router.rs @@ -28,9 +28,13 @@ impl RouterContentSource { #[turbo_tasks::value_impl] impl ContentSource for RouterContentSource { #[turbo_tasks::function] - fn get(&self, path: &str, data: Value) -> ContentSourceResultVc { + async fn get( + &self, + path: &str, + data: Value, + ) -> Result { let (source, path) = self.get_source(path); - source.get(path, data) + Ok(source.resolve().await?.get(path, data)) } #[turbo_tasks::function]