diff --git a/crates/turbo-tasks-memory/src/cell.rs b/crates/turbo-tasks-memory/src/cell.rs index b748f3c9c538a2..07d4f0b61fdb31 100644 --- a/crates/turbo-tasks-memory/src/cell.rs +++ b/crates/turbo-tasks-memory/src/cell.rs @@ -11,6 +11,8 @@ use turbo_tasks::{ TaskId, TurboTasksBackendApi, }; +use crate::MemoryBackend; + #[derive(Default, Debug)] pub(crate) enum Cell { /// No content has been set yet, or it was removed for memory pressure @@ -204,7 +206,11 @@ impl Cell { } } - pub fn assign(&mut self, content: CellContent, turbo_tasks: &dyn TurboTasksBackendApi) { + pub fn assign( + &mut self, + content: CellContent, + turbo_tasks: &dyn TurboTasksBackendApi, + ) { match self { Cell::Empty => { *self = Cell::Value { @@ -287,7 +293,7 @@ impl Cell { } /// Drops the cell after GC. Will notify all dependent tasks and events. - pub fn gc_drop(self, turbo_tasks: &dyn TurboTasksBackendApi) { + pub fn gc_drop(self, turbo_tasks: &dyn TurboTasksBackendApi) { match self { Cell::Empty => {} Cell::Recomputing { diff --git a/crates/turbo-tasks-memory/src/gc.rs b/crates/turbo-tasks-memory/src/gc.rs index 0cc131e1f4e3d9..8bc89f8854c06a 100644 --- a/crates/turbo-tasks-memory/src/gc.rs +++ b/crates/turbo-tasks-memory/src/gc.rs @@ -152,7 +152,7 @@ impl GcQueue { &self, factor: u8, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> Option<(GcPriority, usize, GcStats)> { // Process through the inactive propagation queue. while let Ok(task) = self.inactive_propagate_queue.pop() { diff --git a/crates/turbo-tasks-memory/src/memory_backend.rs b/crates/turbo-tasks-memory/src/memory_backend.rs index 5aa005e32ac09a..4ca882e349f3b6 100644 --- a/crates/turbo-tasks-memory/src/memory_backend.rs +++ b/crates/turbo-tasks-memory/src/memory_backend.rs @@ -50,8 +50,6 @@ pub struct MemoryBackend { backend_jobs: NoMoveVec, backend_job_id_factory: IdFactory, task_cache: DashMap, TaskId, BuildHasherDefault>, - read_collectibles_task_cache: - DashMap<(TaskScopeId, TraitTypeId), TaskId, BuildHasherDefault>, memory_limit: usize, gc_queue: Option, idle_gc_active: AtomicBool, @@ -80,7 +78,6 @@ impl MemoryBackend { backend_jobs: NoMoveVec::new(), backend_job_id_factory: IdFactory::new(), task_cache: DashMap::default(), - read_collectibles_task_cache: DashMap::default(), memory_limit, gc_queue: (memory_limit != usize::MAX).then(GcQueue::new), idle_gc_active: AtomicBool::new(false), @@ -92,7 +89,7 @@ impl MemoryBackend { &self, parent: TaskId, child: TaskId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { self.with_task(parent, |parent| { parent.connect_child(child, self, turbo_tasks) @@ -114,7 +111,7 @@ impl MemoryBackend { id: TaskId, strongly_consistent: bool, note: impl Fn() -> String + Sync + Send + 'static, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, func: F, ) -> Result> { self.with_task(id, |task| { @@ -126,14 +123,6 @@ impl MemoryBackend { for id in self.task_cache.clone().into_read_only().values() { func(*id); } - for id in self - .read_collectibles_task_cache - .clone() - .into_read_only() - .values() - { - func(*id); - } } #[inline(always)] @@ -155,10 +144,19 @@ impl MemoryBackend { id } + pub fn create_new_no_collectibles_scope(&self, tasks: usize) -> TaskScopeId { + let id = self.scope_id_factory.get(); + unsafe { + self.memory_task_scopes + .insert(*id, TaskScope::new_no_collectibles(id, tasks)); + } + id + } + fn increase_scope_active_queue( &self, mut queue: Vec, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { while let Some(scope) = queue.pop() { if let Some(tasks) = self.with_scope(scope, |scope| { @@ -174,7 +172,7 @@ impl MemoryBackend { pub(crate) fn increase_scope_active( &self, scope: TaskScopeId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { self.increase_scope_active_queue(vec![scope], turbo_tasks); } @@ -183,7 +181,7 @@ impl MemoryBackend { &self, scope: TaskScopeId, count: usize, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { let mut queue = Vec::new(); if let Some(tasks) = self.with_scope(scope, |scope| { @@ -200,7 +198,7 @@ impl MemoryBackend { &self, scope: TaskScopeId, task_id: TaskId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { self.decrease_scope_active_by(scope, task_id, 1, turbo_tasks); } @@ -210,7 +208,7 @@ impl MemoryBackend { scope_id: TaskScopeId, task_id: TaskId, count: usize, - _turbo_tasks: &dyn TurboTasksBackendApi, + _turbo_tasks: &dyn TurboTasksBackendApi, ) { let mut queue = Vec::new(); self.with_scope(scope_id, |scope| { @@ -239,7 +237,7 @@ impl MemoryBackend { } } - pub fn run_gc(&self, idle: bool, turbo_tasks: &dyn TurboTasksBackendApi) { + pub fn run_gc(&self, idle: bool, turbo_tasks: &dyn TurboTasksBackendApi) { if let Some(gc_queue) = &self.gc_queue { const MAX_COLLECT_FACTOR: u8 = u8::MAX / 8; @@ -277,43 +275,65 @@ impl MemoryBackend { } } - pub(crate) fn get_or_create_read_collectibles_task( + pub(crate) fn get_or_create_read_task_collectibles_task( + &self, + task_id: TaskId, + trait_type: TraitTypeId, + parent_task: TaskId, + turbo_tasks: &dyn TurboTasksBackendApi, + ) -> TaskId { + self.with_task(task_id, |task| { + let id = task.get_read_collectibles_task(trait_type, || { + let scope = self.create_new_no_collectibles_scope(1); + + let id = turbo_tasks.get_fresh_task_id().into(); + let task = Task::new_read_task_collectibles( + // Safety: That task will hold the value, but we are still in + // control of the task + id, + scope, + task_id, + trait_type, + turbo_tasks.stats_type(), + ); + // Safety: We have a fresh task id that nobody knows about yet + unsafe { self.memory_tasks.insert(*id, task) }; + self.with_scope(scope, |scope| { + scope.state.lock().add_dirty_task(id); + }); + id + }); + self.connect_task_child(parent_task, id, turbo_tasks); + id + }) + } + + pub(crate) fn get_or_create_read_scope_collectibles_task( &self, scope_id: TaskScopeId, trait_type: TraitTypeId, parent_task: TaskId, - root_scoped: bool, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> TaskId { - if let Some(task) = self.lookup_and_connect_task( - parent_task, - &self.read_collectibles_task_cache, - &(scope_id, trait_type), - turbo_tasks, - ) { - // fast pass without creating a new task - task - } else { - // slow pass with key lock - let id = turbo_tasks.get_fresh_task_id(); - let task = Task::new_read_collectibles( - // Safety: That task will hold the value, but we are still in - // control of the task - *unsafe { id.get_unchecked() }, - scope_id, - trait_type, - turbo_tasks.stats_type(), - ); - self.insert_and_connect_fresh_task( - parent_task, - &self.read_collectibles_task_cache, - (scope_id, trait_type), - id, - task, - root_scoped, - turbo_tasks, - ) - } + self.with_scope(scope_id, |scope| { + let mut state = scope.state.lock(); + let task_id = state.get_read_collectibles_task(trait_type, || { + let id = turbo_tasks.get_fresh_task_id().into(); + let task = Task::new_read_scope_collectibles( + // Safety: That task will hold the value, but we are still in + // control of the task + id, + scope_id, + trait_type, + turbo_tasks.stats_type(), + ); + // Safety: We have a fresh task id that nobody knows about yet + unsafe { self.memory_tasks.insert(*id, task) }; + id + }); + self.connect_task_child(parent_task, task_id, turbo_tasks); + task_id + }) } fn insert_and_connect_fresh_task( @@ -324,7 +344,7 @@ impl MemoryBackend { new_id: Unused, task: Task, root_scoped: bool, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> TaskId { let new_id = new_id.into(); // Safety: We have a fresh task id that nobody knows about yet @@ -357,7 +377,7 @@ impl MemoryBackend { parent_task: TaskId, task_cache: &DashMap, key: &Q, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> Option where K: Borrow, @@ -372,7 +392,7 @@ impl MemoryBackend { } impl Backend for MemoryBackend { - fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi) { + fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi) { if self .idle_gc_active .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) @@ -383,11 +403,15 @@ impl Backend for MemoryBackend { } } - fn invalidate_task(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi) { + fn invalidate_task(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi) { self.with_task(task, |task| task.invalidate(self, turbo_tasks)); } - fn invalidate_tasks(&self, tasks: Vec, turbo_tasks: &dyn TurboTasksBackendApi) { + fn invalidate_tasks( + &self, + tasks: Vec, + turbo_tasks: &dyn TurboTasksBackendApi, + ) { for task in tasks.into_iter() { self.with_task(task, |task| { task.invalidate(self, turbo_tasks); @@ -412,7 +436,7 @@ impl Backend for MemoryBackend { fn try_start_task_execution( &self, task: TaskId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> Option { self.with_task(task, |task| task.execute(self, turbo_tasks)) } @@ -421,7 +445,7 @@ impl Backend for MemoryBackend { &self, task: TaskId, result: Result, Option>>, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { self.with_task(task, |task| { task.execution_result(result, self, turbo_tasks); @@ -434,7 +458,7 @@ impl Backend for MemoryBackend { duration: Duration, instant: Instant, stateful: bool, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> bool { let reexecute = self.with_task(task_id, |task| { task.execution_completed(duration, instant, stateful, self, turbo_tasks) @@ -453,7 +477,7 @@ impl Backend for MemoryBackend { task: TaskId, reader: TaskId, strongly_consistent: bool, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> Result> { if task == reader { bail!("reading it's own output is not possible"); @@ -474,7 +498,7 @@ impl Backend for MemoryBackend { &self, task: TaskId, strongly_consistent: bool, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> Result> { self.try_get_output( task, @@ -490,7 +514,7 @@ impl Backend for MemoryBackend { task_id: TaskId, index: CellId, reader: TaskId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> Result> { if task_id == reader { Ok(Ok(self.with_task(task_id, |task| { @@ -522,7 +546,7 @@ impl Backend for MemoryBackend { &self, current_task: TaskId, index: CellId, - _turbo_tasks: &dyn TurboTasksBackendApi, + _turbo_tasks: &dyn TurboTasksBackendApi, ) -> Result { Ok(self.with_task(current_task, |task| { task.with_cell(index, |cell| cell.read_own_content_untracked()) @@ -533,7 +557,7 @@ impl Backend for MemoryBackend { &self, task_id: TaskId, index: CellId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> Result> { self.with_task(task_id, |task| { match task.with_cell_mut(index, |cell| { @@ -558,7 +582,7 @@ impl Backend for MemoryBackend { id: TaskId, trait_id: TraitTypeId, reader: TaskId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> RawVcSetVc { self.with_task(id, |task| { task.read_task_collectibles(reader, trait_id, self, turbo_tasks) @@ -570,7 +594,7 @@ impl Backend for MemoryBackend { trait_type: TraitTypeId, collectible: RawVc, id: TaskId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { self.with_task(id, |task| { task.emit_collectible(trait_type, collectible, self, turbo_tasks) @@ -582,7 +606,7 @@ impl Backend for MemoryBackend { trait_type: TraitTypeId, collectible: RawVc, id: TaskId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { self.with_task(id, |task| { task.unemit_collectible(trait_type, collectible, self, turbo_tasks) @@ -594,7 +618,7 @@ impl Backend for MemoryBackend { task: TaskId, index: CellId, content: CellContent, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { self.with_task(task, |task| { task.with_cell_mut(index, |cell| cell.assign(content, turbo_tasks)) @@ -605,7 +629,7 @@ impl Backend for MemoryBackend { fn run_backend_job<'a>( &'a self, id: BackendJobId, - turbo_tasks: &'a dyn TurboTasksBackendApi, + turbo_tasks: &'a dyn TurboTasksBackendApi, ) -> Pin + Send + 'a>> { // SAFETY: id will not be reused until with job is done if let Some(job) = unsafe { self.backend_jobs.take(*id) } { @@ -625,7 +649,7 @@ impl Backend for MemoryBackend { &self, mut task_type: PersistentTaskType, parent_task: TaskId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> TaskId { if let Some(task) = self.lookup_and_connect_task(parent_task, &self.task_cache, &task_type, turbo_tasks) @@ -658,10 +682,19 @@ impl Backend for MemoryBackend { } } + fn connect_task( + &self, + task: TaskId, + parent_task: TaskId, + turbo_tasks: &dyn TurboTasksBackendApi, + ) { + self.connect_task_child(parent_task, task, turbo_tasks); + } + fn create_transient_task( &self, task_type: TransientTaskType, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> TaskId { let id = turbo_tasks.get_fresh_task_id(); // use INITIAL_SCOPE @@ -719,7 +752,11 @@ impl Job { } } - async fn run(self, backend: &MemoryBackend, turbo_tasks: &dyn TurboTasksBackendApi) { + async fn run( + self, + backend: &MemoryBackend, + turbo_tasks: &dyn TurboTasksBackendApi, + ) { match self { Job::RemoveFromScopes(tasks, scopes) => { for task in tasks { diff --git a/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs b/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs index 481e1b4efc558d..08658286902fa3 100644 --- a/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs +++ b/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs @@ -134,7 +134,7 @@ enum BackgroundJob { DeactivatePersisted(TaskId), } -pub struct MemoryBackendWithPersistedGraph { +pub struct MemoryBackendWithPersistedGraph { pub pg: P, tasks: NoMoveVec, cache: DashMap, @@ -189,7 +189,7 @@ impl MemoryBackendWithPersistedGraph

{ fn state_mut( &self, task: TaskId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) -> (MutexGuard<'_, TaskState>, &Task) { let task_info = self.tasks.get(*task).unwrap(); let mut state = task_info.task_state.lock().unwrap(); @@ -200,7 +200,7 @@ impl MemoryBackendWithPersistedGraph

{ fn mem_state_mut( &self, task: TaskId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) -> (MutexGuard<'_, TaskState>, &Task) { let task_info = self.tasks.get(*task).unwrap(); loop { @@ -234,7 +234,7 @@ impl MemoryBackendWithPersistedGraph

{ task: TaskId, task_info: &Task, task_state: &mut TaskState, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) { if task_state.memory.is_none() && task_state.persisted.is_none() { if let TaskType::Persistent(_) = &task_info.task_type { @@ -257,7 +257,7 @@ impl MemoryBackendWithPersistedGraph

{ task: TaskId, task_state: &mut TaskState, delayed_activate: &mut Vec, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) { if task_state.memory.is_none() { if let Some((data, state)) = self.pg_read(task, turbo_tasks) { @@ -317,7 +317,7 @@ impl MemoryBackendWithPersistedGraph

{ fn lookup( &self, task_type: &PersistentTaskType, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) -> Option { for i in 0..task_type.len() { let partial = task_type.partial(i); @@ -336,7 +336,12 @@ impl MemoryBackendWithPersistedGraph

{ self.pg_lookup_one(task_type, turbo_tasks) } - fn connect(&self, parent_task: TaskId, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi) { + fn connect( + &self, + parent_task: TaskId, + task: TaskId, + turbo_tasks: &dyn TurboTasksBackendApi>, + ) { // connect() will never be called concurrently for the same parent_task // therefore it's safe to add the task into children before incrementing // active_parents. @@ -361,7 +366,7 @@ impl MemoryBackendWithPersistedGraph

{ &self, parent_task: TaskId, task: TaskId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) { // The active_parents count was already initialized with 1 // When this was incorrect, we need to revert that. @@ -385,7 +390,11 @@ impl MemoryBackendWithPersistedGraph

{ } } - fn schedule_background_job(&self, job: BackgroundJob, turbo_tasks: &dyn TurboTasksBackendApi) { + fn schedule_background_job( + &self, + job: BackgroundJob, + turbo_tasks: &dyn TurboTasksBackendApi>, + ) { let id = self.background_job_id_factory.get(); // SAFETY: It's a fresh id unsafe { @@ -394,7 +403,11 @@ impl MemoryBackendWithPersistedGraph

{ turbo_tasks.schedule_backend_background_job(id); } - fn activate_persisted(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi) { + fn activate_persisted( + &self, + task: TaskId, + turbo_tasks: &dyn TurboTasksBackendApi>, + ) { if let Some(ActivateResult { keeps_external_active, external, @@ -425,7 +438,11 @@ impl MemoryBackendWithPersistedGraph

{ } } - fn deactivate_persisted(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi) { + fn deactivate_persisted( + &self, + task: TaskId, + turbo_tasks: &dyn TurboTasksBackendApi>, + ) { if let Some(DeactivateResult { more_tasks_to_deactivate, }) = self.pg_deactivate_when_needed(task, turbo_tasks) @@ -445,7 +462,7 @@ impl MemoryBackendWithPersistedGraph

{ &self, task: TaskId, by: u32, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) { let mut delayed_activate = Vec::new(); self.try_increment_active_parents(task, true, by, &mut delayed_activate, turbo_tasks); @@ -469,7 +486,7 @@ impl MemoryBackendWithPersistedGraph

{ force: bool, by: u32, delayed_activate: &mut Vec, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) { let task_info = self.tasks.get(*task).unwrap(); let prev = task_info.active_parents.fetch_add(by, Ordering::Relaxed); @@ -496,7 +513,7 @@ impl MemoryBackendWithPersistedGraph

{ task: TaskId, state: MutexGuard, task_info: &Task, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) { let mut delayed_activate = Vec::new(); self.activate_task_inner(task, state, task_info, &mut delayed_activate, turbo_tasks); @@ -520,7 +537,7 @@ impl MemoryBackendWithPersistedGraph

{ mut state: MutexGuard, task_info: &Task, delayed_activate: &mut Vec, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) { let TaskState { ref mut active, @@ -586,7 +603,7 @@ impl MemoryBackendWithPersistedGraph

{ &self, task: TaskId, by: u32, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) { self.decrement_active_parents_limited(&[(task, by)], 0, turbo_tasks); } @@ -595,7 +612,7 @@ impl MemoryBackendWithPersistedGraph

{ &self, tasks: &[(TaskId, u32)], remaining_depth: u8, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) { let mut delayed_deactivate = Vec::new(); for (task, by) in tasks { @@ -621,7 +638,7 @@ impl MemoryBackendWithPersistedGraph

{ by: u32, remaining_depth: u8, delayed_deactivate: &mut Vec, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) { let task_info = self.tasks.get(*task).unwrap(); let prev = task_info.active_parents.fetch_sub(by, Ordering::Relaxed); @@ -656,7 +673,7 @@ impl MemoryBackendWithPersistedGraph

{ task: TaskId, state: MutexGuard, task_info: &Task, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) { let mut delayed_deactivate = Vec::new(); self.deactivate_task_inner( @@ -675,7 +692,11 @@ impl MemoryBackendWithPersistedGraph

{ } } - fn deactivate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi) { + fn deactivate_tasks( + &self, + tasks: &[TaskId], + turbo_tasks: &dyn TurboTasksBackendApi>, + ) { let mut delayed_deactivate = Vec::new(); for task in tasks { let (state, task_info) = self.state_mut(*task, turbo_tasks); @@ -703,7 +724,7 @@ impl MemoryBackendWithPersistedGraph

{ task_info: &Task, remaining_depth: u8, delayed_deactivate: &mut Vec, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) { let mut deactivate_persisted = false; let TaskState { @@ -738,7 +759,10 @@ impl MemoryBackendWithPersistedGraph

{ } } - fn persist(&self, turbo_tasks: &dyn TurboTasksBackendApi) -> bool { + fn persist( + &self, + turbo_tasks: &dyn TurboTasksBackendApi>, + ) -> bool { loop { if let Ok(mut task) = self.persist_queue1.pop() { self.persist_queue1_queued.remove(&task); @@ -879,7 +903,11 @@ impl MemoryBackendWithPersistedGraph

{ .any(|q| !q.lock().unwrap().is_empty()) } - fn increase_persist_workers(&self, n: usize, turbo_tasks: &dyn TurboTasksBackendApi) { + fn increase_persist_workers( + &self, + n: usize, + turbo_tasks: &dyn TurboTasksBackendApi>, + ) { loop { let capacity = self.persist_capacity.load(Ordering::Acquire); if capacity == 0 { @@ -905,7 +933,7 @@ impl MemoryBackendWithPersistedGraph

{ } impl Backend for MemoryBackendWithPersistedGraph

{ - fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi) { + fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi>) { let (tasks_to_activate, tasks_to_deactivate) = self.pg_get_pending_active_update(turbo_tasks); let tasks = self.pg_get_active_external_tasks(turbo_tasks); @@ -975,11 +1003,15 @@ impl Backend for MemoryBackendWithPersistedGraph

{ } } - fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi) { + fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi>) { self.pg_stop(turbo_tasks); } - fn invalidate_task(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi) { + fn invalidate_task( + &self, + task: TaskId, + turbo_tasks: &dyn TurboTasksBackendApi>, + ) { let (mut state, _) = self.state_mut(task, turbo_tasks); if let Some(MemoryTaskState { freshness, .. }) = &mut state.memory { @@ -1002,7 +1034,11 @@ impl Backend for MemoryBackendWithPersistedGraph

{ } } - fn invalidate_tasks(&self, tasks: Vec, turbo_tasks: &dyn TurboTasksBackendApi) { + fn invalidate_tasks( + &self, + tasks: Vec, + turbo_tasks: &dyn TurboTasksBackendApi>, + ) { for task in tasks { self.invalidate_task(task, turbo_tasks); } @@ -1025,7 +1061,7 @@ impl Backend for MemoryBackendWithPersistedGraph

{ fn try_start_task_execution( &self, task: TaskId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) -> Option { let (mut state, task_info) = self.mem_state_mut(task, turbo_tasks); let mem_state = state.memory.as_mut().unwrap(); @@ -1078,7 +1114,7 @@ impl Backend for MemoryBackendWithPersistedGraph

{ &self, task: TaskId, result: Result, Option>>, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) { let (mut state, _task_info) = self.mem_state_mut(task, turbo_tasks); let TaskState { ref mut memory, .. } = *state; @@ -1113,7 +1149,7 @@ impl Backend for MemoryBackendWithPersistedGraph

{ duration: Duration, _instant: Instant, _stateful: bool, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) -> bool { #[cfg(feature = "log_running_tasks")] { @@ -1174,7 +1210,7 @@ impl Backend for MemoryBackendWithPersistedGraph

{ fn run_backend_job<'a>( &'a self, id: BackendJobId, - turbo_tasks: &'a dyn TurboTasksBackendApi, + turbo_tasks: &'a dyn TurboTasksBackendApi>, ) -> Pin + Send + 'a>> { if id == self.persist_job { return Box::pin(async { @@ -1220,7 +1256,7 @@ impl Backend for MemoryBackendWithPersistedGraph

{ task: TaskId, reader: TaskId, _strongly_consistent: bool, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) -> Result> { let (mut state, _task_info) = self.mem_state_mut(task, turbo_tasks); let TaskState { @@ -1259,7 +1295,7 @@ impl Backend for MemoryBackendWithPersistedGraph

{ &self, task: TaskId, _strongly_consistent: bool, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) -> Result> { let (state, task_info) = self.mem_state_mut(task, turbo_tasks); let mem_state = state.memory.as_ref().unwrap(); @@ -1280,7 +1316,7 @@ impl Backend for MemoryBackendWithPersistedGraph

{ task: TaskId, index: CellId, reader: TaskId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) -> Result> { let (mut state, _task_info) = self.mem_state_mut(task, turbo_tasks); let TaskState { @@ -1345,7 +1381,7 @@ impl Backend for MemoryBackendWithPersistedGraph

{ &self, task: TaskId, index: CellId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) -> Result> { let (mut state, _) = self.mem_state_mut(task, turbo_tasks); let TaskState { @@ -1383,7 +1419,7 @@ impl Backend for MemoryBackendWithPersistedGraph

{ &self, task: TaskId, index: CellId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) -> Result { let (state, _) = self.mem_state_mut(task, turbo_tasks); let mem_state = state.memory.as_ref().unwrap(); @@ -1402,7 +1438,7 @@ impl Backend for MemoryBackendWithPersistedGraph

{ _task: TaskId, _trait_id: TraitTypeId, _reader: TaskId, - _turbo_tasks: &dyn TurboTasksBackendApi, + _turbo_tasks: &dyn TurboTasksBackendApi>, ) -> RawVcSetVc { todo!() } @@ -1412,7 +1448,7 @@ impl Backend for MemoryBackendWithPersistedGraph

{ _trait_id: TraitTypeId, _collectible: RawVc, _task: TaskId, - _turbo_tasks: &dyn TurboTasksBackendApi, + _turbo_tasks: &dyn TurboTasksBackendApi>, ) { todo!() } @@ -1422,7 +1458,7 @@ impl Backend for MemoryBackendWithPersistedGraph

{ _trait_id: TraitTypeId, _collectible: RawVc, _task: TaskId, - _turbo_tasks: &dyn TurboTasksBackendApi, + _turbo_tasks: &dyn TurboTasksBackendApi>, ) { todo!() } @@ -1432,7 +1468,7 @@ impl Backend for MemoryBackendWithPersistedGraph

{ task: TaskId, index: CellId, content: CellContent, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) { let (mut state, task_info) = self.mem_state_mut(task, turbo_tasks); let TaskState { @@ -1473,7 +1509,7 @@ impl Backend for MemoryBackendWithPersistedGraph

{ &self, task_type: PersistentTaskType, parent_task: TaskId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) -> TaskId { if let Some(task) = self.cache.get(&task_type) { self.connect(parent_task, *task, turbo_tasks); @@ -1526,10 +1562,19 @@ impl Backend for MemoryBackendWithPersistedGraph

{ } } + fn connect_task( + &self, + _task: TaskId, + _parent_task: TaskId, + _turbo_tasks: &dyn TurboTasksBackendApi>, + ) { + todo!() + } + fn create_transient_task( &self, task_type: TransientTaskType, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) -> TaskId { let task = turbo_tasks.get_fresh_task_id(); let task = task.into(); @@ -1556,9 +1601,9 @@ impl Backend for MemoryBackendWithPersistedGraph

{ } } -struct MemoryBackendPersistedGraphApi<'a, P: PersistedGraph> { +struct MemoryBackendPersistedGraphApi<'a, P: PersistedGraph + 'static> { backend: &'a MemoryBackendWithPersistedGraph

, - turbo_tasks: &'a dyn TurboTasksBackendApi, + turbo_tasks: &'a dyn TurboTasksBackendApi>, } impl<'a, P: PersistedGraph> PersistedGraphApi for MemoryBackendPersistedGraphApi<'a, P> { @@ -1607,7 +1652,7 @@ impl MemoryBackendWithPersistedGraph

{ fn pg_read( &self, task: TaskId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) -> Option<(TaskData, ReadTaskState)> { self.pg .read( @@ -1623,7 +1668,7 @@ impl MemoryBackendWithPersistedGraph

{ fn pg_lookup_one( &self, task_type: &PersistentTaskType, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) -> Option { self.pg .lookup_one( @@ -1639,7 +1684,7 @@ impl MemoryBackendWithPersistedGraph

{ fn pg_lookup( &self, partial_task_type: &PersistentTaskType, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) -> bool { self.pg .lookup( @@ -1652,7 +1697,11 @@ impl MemoryBackendWithPersistedGraph

{ .unwrap() } - fn pg_is_persisted(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi) -> bool { + fn pg_is_persisted( + &self, + task: TaskId, + turbo_tasks: &dyn TurboTasksBackendApi>, + ) -> bool { self.pg .is_persisted( task, @@ -1669,7 +1718,7 @@ impl MemoryBackendWithPersistedGraph

{ task: TaskId, data: TaskData, state: PersistTaskState, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) -> Option { self.pg .persist( @@ -1688,7 +1737,7 @@ impl MemoryBackendWithPersistedGraph

{ fn pg_activate_when_needed( &self, task: TaskId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) -> Option { self.pg .activate_when_needed( @@ -1705,7 +1754,7 @@ impl MemoryBackendWithPersistedGraph

{ fn pg_deactivate_when_needed( &self, task: TaskId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) -> Option { self.pg .deactivate_when_needed( @@ -1722,7 +1771,7 @@ impl MemoryBackendWithPersistedGraph

{ fn pg_set_externally_active( &self, task: TaskId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) -> bool { self.pg .set_externally_active( @@ -1739,7 +1788,7 @@ impl MemoryBackendWithPersistedGraph

{ fn pg_unset_externally_active( &self, task: TaskId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) -> bool { self.pg .unset_externally_active( @@ -1753,7 +1802,11 @@ impl MemoryBackendWithPersistedGraph

{ } #[must_use] - fn pg_make_dirty(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi) -> bool { + fn pg_make_dirty( + &self, + task: TaskId, + turbo_tasks: &dyn TurboTasksBackendApi>, + ) -> bool { self.pg .make_dirty( task, @@ -1769,7 +1822,7 @@ impl MemoryBackendWithPersistedGraph

{ fn pg_make_dependent_dirty( &self, vc: RawVc, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) -> Vec { self.pg .make_dependent_dirty( @@ -1782,7 +1835,11 @@ impl MemoryBackendWithPersistedGraph

{ .unwrap() } - fn pg_make_clean(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi) { + fn pg_make_clean( + &self, + task: TaskId, + turbo_tasks: &dyn TurboTasksBackendApi>, + ) { self.pg .make_clean( task, @@ -1799,7 +1856,7 @@ impl MemoryBackendWithPersistedGraph

{ #[must_use] fn pg_remove_outdated_externally_active( &self, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) -> Vec { self.pg .remove_outdated_externally_active(&MemoryBackendPersistedGraphApi { @@ -1810,7 +1867,10 @@ impl MemoryBackendWithPersistedGraph

{ } #[must_use] - fn pg_get_active_external_tasks(&self, turbo_tasks: &dyn TurboTasksBackendApi) -> Vec { + fn pg_get_active_external_tasks( + &self, + turbo_tasks: &dyn TurboTasksBackendApi>, + ) -> Vec { self.pg .get_active_external_tasks(&MemoryBackendPersistedGraphApi { backend: self, @@ -1820,7 +1880,10 @@ impl MemoryBackendWithPersistedGraph

{ } #[must_use] - fn pg_get_dirty_active_tasks(&self, turbo_tasks: &dyn TurboTasksBackendApi) -> Vec { + fn pg_get_dirty_active_tasks( + &self, + turbo_tasks: &dyn TurboTasksBackendApi>, + ) -> Vec { self.pg .get_dirty_active_tasks(&MemoryBackendPersistedGraphApi { backend: self, @@ -1832,7 +1895,7 @@ impl MemoryBackendWithPersistedGraph

{ #[must_use] fn pg_get_pending_active_update( &self, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi>, ) -> (Vec, Vec) { self.pg .get_pending_active_update(&MemoryBackendPersistedGraphApi { @@ -1842,7 +1905,7 @@ impl MemoryBackendWithPersistedGraph

{ .unwrap() } - fn pg_stop(&self, turbo_tasks: &dyn TurboTasksBackendApi) { + fn pg_stop(&self, turbo_tasks: &dyn TurboTasksBackendApi>) { self.pg .stop(&MemoryBackendPersistedGraphApi { backend: self, diff --git a/crates/turbo-tasks-memory/src/output.rs b/crates/turbo-tasks-memory/src/output.rs index cdca2be4a209d3..c832db6c55a7f5 100644 --- a/crates/turbo-tasks-memory/src/output.rs +++ b/crates/turbo-tasks-memory/src/output.rs @@ -9,6 +9,8 @@ use auto_hash_map::AutoSet; use nohash_hasher::BuildNoHashHasher; use turbo_tasks::{util::SharedError, RawVc, TaskId, TurboTasksBackendApi}; +use crate::MemoryBackend; + #[derive(Default, Debug)] pub struct Output { pub(crate) content: OutputContent, @@ -60,12 +62,12 @@ impl Output { } } - pub fn link(&mut self, target: RawVc, turbo_tasks: &dyn TurboTasksBackendApi) { + pub fn link(&mut self, target: RawVc, turbo_tasks: &dyn TurboTasksBackendApi) { debug_assert!(*self != target); self.assign(OutputContent::Link(target), turbo_tasks) } - pub fn error(&mut self, error: Error, turbo_tasks: &dyn TurboTasksBackendApi) { + pub fn error(&mut self, error: Error, turbo_tasks: &dyn TurboTasksBackendApi) { self.content = OutputContent::Error(SharedError::new(error)); self.updates += 1; // notify @@ -77,7 +79,7 @@ impl Output { pub fn panic( &mut self, message: Option>, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { self.content = OutputContent::Panic(message); self.updates += 1; @@ -87,7 +89,11 @@ impl Output { } } - pub fn assign(&mut self, content: OutputContent, turbo_tasks: &dyn TurboTasksBackendApi) { + pub fn assign( + &mut self, + content: OutputContent, + turbo_tasks: &dyn TurboTasksBackendApi, + ) { self.content = content; self.updates += 1; // notify @@ -100,7 +106,7 @@ impl Output { &self.dependent_tasks } - pub fn gc_drop(self, turbo_tasks: &dyn TurboTasksBackendApi) { + pub fn gc_drop(self, turbo_tasks: &dyn TurboTasksBackendApi) { // notify if !self.dependent_tasks.is_empty() { turbo_tasks.schedule_notify_tasks_set(&self.dependent_tasks); diff --git a/crates/turbo-tasks-memory/src/scope.rs b/crates/turbo-tasks-memory/src/scope.rs index 92a9f3163414b6..a15ebf8ae55ec6 100644 --- a/crates/turbo-tasks-memory/src/scope.rs +++ b/crates/turbo-tasks-memory/src/scope.rs @@ -115,6 +115,8 @@ impl<'a> Iterator for TaskScopesIterator<'a> { pub struct TaskScope { #[cfg(feature = "print_scope_updates")] pub id: TaskScopeId, + /// If true, this scope will propagate collectibles to parent scopes + propagate_collectibles: bool, /// Total number of tasks tasks: AtomicUsize, /// Number of tasks that are not Done, unfinished child scopes also count as @@ -129,6 +131,21 @@ pub struct TaskScope { pub state: Mutex, } +#[derive(Debug, Default)] +struct ScopeCollectiblesInfo { + collectibles: CountHashSet, + dependent_tasks: AutoSet>, + read_collectibles_task: Option, +} + +impl ScopeCollectiblesInfo { + fn is_unset(&self) -> bool { + self.collectibles.is_unset() + && self.dependent_tasks.is_empty() + && self.read_collectibles_task.is_none() + } +} + #[derive(Debug)] pub struct TaskScopeState { #[cfg(feature = "print_scope_updates")] @@ -153,14 +170,7 @@ pub struct TaskScopeState { /// When they change these tasks are invalidated dependent_tasks: AutoSet>, /// Emitted collectibles with count and dependent_tasks by trait type - collectibles: AutoMap< - TraitTypeId, - ( - CountHashSet, - AutoSet>, - ), - BuildNoHashHasher, - >, + collectibles: AutoMap>, } impl TaskScope { @@ -169,25 +179,30 @@ impl TaskScope { Self { #[cfg(feature = "print_scope_updates")] id, + propagate_collectibles: true, tasks: AtomicUsize::new(tasks), unfinished_tasks: AtomicIsize::new(0), - state: Mutex::new(TaskScopeState { + state: Mutex::new(TaskScopeState::new( #[cfg(feature = "print_scope_updates")] id, - active: 0, - dirty_tasks: AutoSet::default(), - children: CountHashSet::new(), - collectibles: AutoMap::default(), - dependent_tasks: AutoSet::default(), - event: Event::new(move || { - #[cfg(feature = "print_scope_updates")] - return format!("TaskScope({id})::event"); - #[cfg(not(feature = "print_scope_updates"))] - return "TaskScope::event".to_owned(); - }), - has_unfinished_tasks: false, - parents: CountHashSet::new(), - }), + false, + )), + } + } + + #[allow(unused_variables)] + pub fn new_no_collectibles(id: TaskScopeId, tasks: usize) -> Self { + Self { + #[cfg(feature = "print_scope_updates")] + id, + propagate_collectibles: false, + tasks: AtomicUsize::new(tasks), + unfinished_tasks: AtomicIsize::new(tasks as isize), + state: Mutex::new(TaskScopeState::new( + #[cfg(feature = "print_scope_updates")] + id, + tasks > 0, + )), } } @@ -196,25 +211,14 @@ impl TaskScope { Self { #[cfg(feature = "print_scope_updates")] id, + propagate_collectibles: true, tasks: AtomicUsize::new(tasks), unfinished_tasks: AtomicIsize::new(unfinished as isize), - state: Mutex::new(TaskScopeState { + state: Mutex::new(TaskScopeState::new_active( #[cfg(feature = "print_scope_updates")] id, - active: 1, - dirty_tasks: AutoSet::default(), - children: CountHashSet::new(), - collectibles: AutoMap::default(), - dependent_tasks: AutoSet::default(), - event: Event::new(move || { - #[cfg(feature = "print_scope_updates")] - return format!("TaskScope({id})::event"); - #[cfg(not(feature = "print_scope_updates"))] - return "TaskScope::event".to_owned(); - }), - has_unfinished_tasks: false, - parents: CountHashSet::new(), - }), + tasks > 0, + )), } } @@ -336,21 +340,28 @@ impl TaskScope { self_id: TaskScopeId, trait_id: TraitTypeId, reader: TaskId, - ) -> (CountHashSet, Vec) { + ) -> Result<(CountHashSet, Vec), EventListener> { let mut state = self.state.lock(); + if state.has_unfinished_tasks { + return Err(state.event.listen()); + } let children = state.children.iter().copied().collect::>(); state.dependent_tasks.insert(reader); Task::add_dependency_to_current(TaskDependency::ScopeChildren(self_id)); let current = { - let (c, dependent_tasks) = state.collectibles.entry(trait_id).or_default(); + let ScopeCollectiblesInfo { + collectibles, + dependent_tasks, + .. + } = state.collectibles.entry(trait_id).or_default(); dependent_tasks.insert(reader); Task::add_dependency_to_current(TaskDependency::ScopeCollectibles(self_id, trait_id)); - c.clone() + collectibles.clone() }; drop(state); - (current, children) + Ok((current, children)) } pub(crate) fn remove_dependent_task(&self, reader: TaskId) { @@ -365,14 +376,18 @@ impl TaskScope { ) { let mut state = self.state.lock(); if let Entry::Occupied(mut entry) = state.collectibles.entry(trait_type) { - let (collectibles, dependent_tasks) = entry.get_mut(); - dependent_tasks.remove(&reader); - if collectibles.is_unset() && dependent_tasks.is_empty() { + let info = entry.get_mut(); + info.dependent_tasks.remove(&reader); + if info.is_unset() { entry.remove(); } } } + pub(crate) fn is_propagating_collectibles(&self) -> bool { + self.propagate_collectibles + } + pub(crate) fn assert_unused(&self) { // This method checks if everything was cleaned up correctly // no more tasks should be attached to this scope in any way @@ -393,11 +408,12 @@ impl TaskScope { "Scope dependent tasks not correctly cleaned up: {:?}", state.dependent_tasks ); - assert!( - state.collectibles.is_empty(), - "Scope collectibles not correctly cleaned up: {:?}", - state.collectibles - ); + // TODO read_collectibles_tasks need to be cleaned up + // assert!( + // state.collectibles.is_empty(), + // "Scope collectibles not correctly cleaned up: {:?}", + // state.collectibles + // ); // assert!( // state.dirty_tasks.is_empty(), // "Scope dirty tasks not correctly cleaned up: {:?}", @@ -444,9 +460,59 @@ pub struct ScopeCollectibleChangeEffect { } impl TaskScopeState { + /// creates a state that is not active + fn new( + #[cfg(feature = "print_scope_updates")] id: TaskScopeId, + has_unfinished_tasks: bool, + ) -> Self { + Self { + #[cfg(feature = "print_scope_updates")] + id, + active: 0, + dirty_tasks: AutoSet::default(), + children: CountHashSet::new(), + collectibles: AutoMap::default(), + dependent_tasks: AutoSet::default(), + event: Event::new(move || { + #[cfg(feature = "print_scope_updates")] + return format!("TaskScope({id})::event"); + #[cfg(not(feature = "print_scope_updates"))] + return "TaskScope::event".to_owned(); + }), + has_unfinished_tasks, + parents: CountHashSet::new(), + } + } + + /// creates a state that is active + fn new_active( + #[cfg(feature = "print_scope_updates")] id: TaskScopeId, + has_unfinished_tasks: bool, + ) -> Self { + Self { + #[cfg(feature = "print_scope_updates")] + id, + active: 1, + dirty_tasks: AutoSet::default(), + children: CountHashSet::new(), + collectibles: AutoMap::default(), + dependent_tasks: AutoSet::default(), + event: Event::new(move || { + #[cfg(feature = "print_scope_updates")] + return format!("TaskScope({id})::event"); + #[cfg(not(feature = "print_scope_updates"))] + return "TaskScope::event".to_owned(); + }), + has_unfinished_tasks, + parents: CountHashSet::new(), + } + } + + /// returns true if the scope is active pub fn is_active(&self) -> bool { self.active > 0 } + /// increments the active counter, returns list of tasks that need to be /// scheduled and list of child scope that need to be incremented after /// releasing the scope lock @@ -457,6 +523,7 @@ impl TaskScopeState { ) -> Option>> { self.increment_active_by(1, more_jobs) } + /// increments the active counter, returns list of tasks that need to be /// scheduled and list of child scope that need to be incremented after /// releasing the scope lock @@ -475,11 +542,13 @@ impl TaskScopeState { None } } + /// decrement the active counter, returns list of child scopes that need to /// be decremented after releasing the scope lock pub fn decrement_active(&mut self, more_jobs: &mut Vec) { self.decrement_active_by(1, more_jobs); } + /// decrement the active counter, returns list of child scopes that need to /// be decremented after releasing the scope lock. Returns `true` when the /// scope has become inactive. @@ -564,13 +633,11 @@ impl TaskScopeState { let mut set = self.take_dependent_tasks(); self.collectibles = take(&mut self.collectibles) .into_iter() - .map(|(key, (collectibles, mut dependent_tasks))| { - set.extend(take(&mut dependent_tasks)); - (key, (collectibles, dependent_tasks)) - }) - .filter(|(_, (collectibles, dependent_tasks))| { - !collectibles.is_unset() || !dependent_tasks.is_empty() + .map(|(key, mut info)| { + set.extend(take(&mut info.dependent_tasks)); + (key, info) }) + .filter(|(_, info)| !info.is_unset()) .collect(); set } @@ -581,7 +648,6 @@ impl TaskScopeState { #[must_use] pub fn add_collectible( &mut self, - trait_id: TraitTypeId, collectible: RawVc, ) -> Option { @@ -600,14 +666,14 @@ impl TaskScopeState { ) -> Option { match self.collectibles.entry(trait_id) { Entry::Occupied(mut entry) => { - let (collectibles, dependent_tasks) = entry.get_mut(); - if collectibles.add_count(collectible, count) { + let info = entry.get_mut(); + if info.collectibles.add_count(collectible, count) { log_scope_update!("add_collectible {} -> {}", *self.id, collectible); Some(ScopeCollectibleChangeEffect { - notify: take(dependent_tasks), + notify: take(&mut info.dependent_tasks), }) } else { - if collectibles.is_unset() && dependent_tasks.is_empty() { + if info.is_unset() { entry.remove(); } None @@ -616,7 +682,7 @@ impl TaskScopeState { Entry::Vacant(entry) => { let result = entry .insert(Default::default()) - .0 + .collectibles .add_count(collectible, count); debug_assert!(result, "this must be always a new entry"); log_scope_update!("add_collectible {} -> {}", *self.id, collectible); @@ -651,15 +717,15 @@ impl TaskScopeState { ) -> Option { match self.collectibles.entry(trait_id) { Entry::Occupied(mut entry) => { - let (collectibles, dependent_tasks) = entry.get_mut(); - let old_value = collectibles.get(&collectible); + let info = entry.get_mut(); + let old_value = info.collectibles.get(&collectible); let new_value = old_value - count as isize; // NOTE: The read_collectibles need to be invalidated when negative count // changes. Each negative count will eliminate one child scope emitted // collectible. So changing from -1 to -2 might affect the visible collectibles. - if collectibles.remove_count(collectible, count) || new_value < 0 { - let notify = take(dependent_tasks); - if collectibles.is_unset() { + if info.collectibles.remove_count(collectible, count) || new_value < 0 { + let notify = take(&mut info.dependent_tasks); + if info.is_unset() { entry.remove(); } log_scope_update!( @@ -675,7 +741,7 @@ impl TaskScopeState { Entry::Vacant(e) => { let result = e .insert(Default::default()) - .0 + .collectibles .remove_count(collectible, count); debug_assert!(!result, "this must never be visible from outside"); @@ -687,4 +753,23 @@ impl TaskScopeState { pub fn take_dependent_tasks(&mut self) -> AutoSet> { take(&mut self.dependent_tasks) } + + pub fn get_read_collectibles_task( + &mut self, + trait_id: TraitTypeId, + create_new: impl FnOnce() -> TaskId, + ) -> TaskId { + let task_id = &mut self + .collectibles + .entry(trait_id) + .or_default() + .read_collectibles_task; + if let Some(task_id) = *task_id { + task_id + } else { + let new_task_id = create_new(); + *task_id = Some(new_task_id); + new_task_id + } + } } diff --git a/crates/turbo-tasks-memory/src/task.rs b/crates/turbo-tasks-memory/src/task.rs index 1a9e7722fc2044..d8b8cdf44b1ebc 100644 --- a/crates/turbo-tasks-memory/src/task.rs +++ b/crates/turbo-tasks-memory/src/task.rs @@ -67,6 +67,16 @@ task_local! { type OnceTaskFn = Mutex> + Send + 'static>>>>; +struct ReadTaskCollectiblesTaskType { + task: TaskId, + trait_type: TraitTypeId, +} + +struct ReadScopeCollectiblesTaskType { + scope: TaskScopeId, + trait_type: TraitTypeId, +} + /// Different Task types enum TaskType { /// A root task that will track dependencies and re-execute when @@ -83,9 +93,15 @@ enum TaskType { Once(OnceTaskFn), /// A task that reads all collectibles of a certain trait from a - /// [TaskScope]. It will do that by recursively calling ReadCollectibles on - /// child scopes, so that results by scope are cached. - ReadCollectibles(TaskScopeId, TraitTypeId), + /// [TaskScope]. It will do that by recursively calling + /// ReadScopeCollectibles on child scopes, so that results by scope are + /// cached. + ReadScopeCollectibles(Box), + + /// A task that reads all collectibles of a certain trait from another task. + /// It will do that by recursively calling ReadScopeCollectibles on child + /// scopes, so that results by task are cached. + ReadTaskCollectibles(Box), /// A normal persistent task Persistent(Arc), @@ -94,7 +110,8 @@ enum TaskType { enum TaskTypeForDescription { Root, Once, - ReadCollectibles(TraitTypeId), + ReadTaskCollectibles(TraitTypeId), + ReadScopeCollectibles(TraitTypeId), Persistent(Arc), } @@ -103,7 +120,13 @@ impl TaskTypeForDescription { match task_type { TaskType::Root(..) => Self::Root, TaskType::Once(..) => Self::Once, - TaskType::ReadCollectibles(.., trait_id) => Self::ReadCollectibles(*trait_id), + TaskType::ReadTaskCollectibles(box ReadTaskCollectiblesTaskType { + trait_type, .. + }) => Self::ReadTaskCollectibles(*trait_type), + TaskType::ReadScopeCollectibles(box ReadScopeCollectiblesTaskType { + trait_type, + .. + }) => Self::ReadScopeCollectibles(*trait_type), TaskType::Persistent(ty) => Self::Persistent(ty.clone()), } } @@ -114,10 +137,18 @@ impl Debug for TaskType { match self { Self::Root(..) => f.debug_tuple("Root").finish(), Self::Once(..) => f.debug_tuple("Once").finish(), - Self::ReadCollectibles(scope_id, trait_id) => f - .debug_tuple("ReadCollectibles") - .field(scope_id) - .field(®istry::get_trait(*trait_id).name) + Self::ReadScopeCollectibles(box ReadScopeCollectiblesTaskType { + scope, + trait_type, + }) => f + .debug_tuple("ReadScopeCollectibles") + .field(scope) + .field(®istry::get_trait(*trait_type).name) + .finish(), + Self::ReadTaskCollectibles(box ReadTaskCollectiblesTaskType { task, trait_type }) => f + .debug_tuple("ReadTaskCollectibles") + .field(task) + .field(®istry::get_trait(*trait_type).name) .finish(), Self::Persistent(ty) => Debug::fmt(ty, f), } @@ -129,7 +160,8 @@ impl Display for TaskType { match self { Self::Root(..) => f.debug_tuple("Root").finish(), Self::Once(..) => f.debug_tuple("Once").finish(), - Self::ReadCollectibles(..) => f.debug_tuple("ReadCollectibles").finish(), + Self::ReadTaskCollectibles(..) => f.debug_tuple("ReadTaskCollectibles").finish(), + Self::ReadScopeCollectibles(..) => f.debug_tuple("ReadScopeCollectibles").finish(), Self::Persistent(ty) => Display::fmt(ty, f), } } @@ -253,6 +285,29 @@ impl TaskState { last_waiting_task: Default::default(), } } + + fn new_root_scoped( + description: impl Fn() -> String + Send + Sync + 'static, + scope: TaskScopeId, + stats_type: StatsType, + ) -> Self { + Self { + scopes: TaskScopes::Root(scope), + state_type: Dirty { + event: Event::new(move || format!("TaskState({})::event", description())), + }, + stateful: false, + children: Default::default(), + collectibles: Default::default(), + output: Default::default(), + prepared_type: PrepareTaskType::None, + cells: Default::default(), + gc: Default::default(), + stats: TaskStats::new(stats_type), + #[cfg(feature = "track_wait_dependencies")] + last_waiting_task: Default::default(), + } + } } /// The partial task state. It's equal to a full TaskState with state = Dirty @@ -324,8 +379,9 @@ impl UnloadedTaskState { } } -/// Keeps track of emitted and unemitted collectibles. Defaults to None to avoid -/// allocating memory for two empty hashsets when no collectibles are emitted. +/// Keeps track of emitted and unemitted collectibles and the +/// read_collectibles tasks. Defaults to None to avoid allocating memory when no +/// collectibles are emitted or read. #[derive(Default)] struct MaybeCollectibles { inner: Option>, @@ -336,12 +392,21 @@ struct MaybeCollectibles { struct Collectibles { emitted: AutoSet<(TraitTypeId, RawVc)>, unemitted: AutoSet<(TraitTypeId, RawVc)>, + read_collectibles_tasks: AutoMap, } impl MaybeCollectibles { /// Consumes the collectibles (if any) and return them. - fn take(&mut self) -> Option> { - self.inner.take() + fn take_collectibles(&mut self) -> Option { + if let Some(inner) = &mut self.inner { + Some(Collectibles { + emitted: take(&mut inner.emitted), + unemitted: take(&mut inner.unemitted), + read_collectibles_tasks: AutoMap::default(), + }) + } else { + None + } } /// Consumes the collectibles (if any) and return them. @@ -373,6 +438,19 @@ impl MaybeCollectibles { .unemitted .insert((trait_type, value)) } + + pub fn get_read_collectibles_task( + &mut self, + trait_id: TraitTypeId, + create_new: impl FnOnce() -> TaskId, + ) -> TaskId { + *self + .inner + .get_or_insert_default() + .read_collectibles_tasks + .entry(trait_id) + .or_insert_with(create_new) + } } enum TaskStateType { @@ -491,13 +569,16 @@ impl Task { } } - pub(crate) fn new_read_collectibles( + pub(crate) fn new_read_scope_collectibles( id: TaskId, target_scope: TaskScopeId, trait_type_id: TraitTypeId, stats_type: StatsType, ) -> Self { - let ty = TaskType::ReadCollectibles(target_scope, trait_type_id); + let ty = TaskType::ReadScopeCollectibles(box ReadScopeCollectiblesTaskType { + scope: target_scope, + trait_type: trait_type_id, + }); let description = Self::get_event_description_static(id, &ty); Self { id, @@ -509,10 +590,34 @@ impl Task { } } + pub(crate) fn new_read_task_collectibles( + id: TaskId, + scope: TaskScopeId, + target_task: TaskId, + trait_type_id: TraitTypeId, + stats_type: StatsType, + ) -> Self { + let ty = TaskType::ReadTaskCollectibles(box ReadTaskCollectiblesTaskType { + task: target_task, + trait_type: trait_type_id, + }); + let description = Self::get_event_description_static(id, &ty); + Self { + id, + ty, + state: RwLock::new(TaskMetaState::Full(box TaskState::new_root_scoped( + description, + scope, + stats_type, + ))), + } + } + pub(crate) fn is_pure(&self) -> bool { match &self.ty { TaskType::Persistent(_) => true, - TaskType::ReadCollectibles(..) => true, + TaskType::ReadTaskCollectibles(..) => true, + TaskType::ReadScopeCollectibles(..) => true, TaskType::Root(_) => false, TaskType::Once(_) => false, } @@ -539,8 +644,13 @@ impl Task { match ty { TaskTypeForDescription::Root => format!("[{}] root", id), TaskTypeForDescription::Once => format!("[{}] once", id), - TaskTypeForDescription::ReadCollectibles(trait_type_id) => format!( - "[{}] read collectibles({})", + TaskTypeForDescription::ReadTaskCollectibles(trait_type_id) => format!( + "[{}] read task collectibles({})", + id, + registry::get_trait(*trait_type_id).name + ), + TaskTypeForDescription::ReadScopeCollectibles(trait_type_id) => format!( + "[{}] read scope collectibles({})", id, registry::get_trait(*trait_type_id).name ), @@ -660,7 +770,7 @@ impl Task { pub(crate) fn execute( self: &Task, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> Option { let mut state = self.full_state_mut(); if !self.try_start_execution(&mut state, turbo_tasks, backend) { @@ -675,7 +785,7 @@ impl Task { fn try_start_execution( &self, state: &mut TaskState, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, backend: &MemoryBackend, ) -> bool { match state.state_type { @@ -697,7 +807,7 @@ impl Task { let set = take(&mut state.children); remove_from_scopes(set, &state.scopes, backend, turbo_tasks); } - if let Some(collectibles) = state.collectibles.take() { + if let Some(collectibles) = state.collectibles.take_collectibles() { remove_collectible_from_scopes( collectibles.emitted, collectibles.unemitted, @@ -723,7 +833,7 @@ impl Task { self: &Task, mut state: FullTaskWriteGuard<'_>, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> Pin> + Send>> { match &self.ty { TaskType::Root(bound_fn) => { @@ -734,15 +844,32 @@ impl Task { drop(state); mutex.lock().take().expect("Task can only be executed once") } - &TaskType::ReadCollectibles(scope_id, trait_type_id) => { + &TaskType::ReadTaskCollectibles(box ReadTaskCollectiblesTaskType { + task: task_id, + trait_type, + }) => { + // Connect the task to the current task. This makes strongly consistent behaving + // as expected and we can look up the collectibles in the current scope. + self.connect_child_internal(state, task_id, backend, &*turbo_tasks); + // state was dropped by previous method + Box::pin(Self::execute_read_task_collectibles( + self.id, + task_id, + trait_type, + turbo_tasks.pin(), + )) + } + &TaskType::ReadScopeCollectibles(box ReadScopeCollectiblesTaskType { + scope, + trait_type, + }) => { drop(state); - Task::make_read_collectibles_execution_future( + Box::pin(Self::execute_read_scope_collectibles( self.id, - scope_id, - trait_type_id, - backend, - turbo_tasks, - ) + scope, + trait_type, + turbo_tasks.pin(), + )) } TaskType::Persistent(ty) => match &**ty { PersistentTaskType::Native(native_fn, inputs) => { @@ -789,7 +916,7 @@ impl Task { &self, result: Result, Option>>, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { let mut state = self.full_state_mut(); match state.state_type { @@ -841,7 +968,7 @@ impl Task { instant: Instant, stateful: bool, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> bool { let mut schedule_task = false; let mut dependencies = DEPENDENCIES_TO_TRACK.with(|deps| deps.take()); @@ -950,7 +1077,11 @@ impl Task { false } - fn make_dirty(&self, backend: &MemoryBackend, turbo_tasks: &dyn TurboTasksBackendApi) { + fn make_dirty( + &self, + backend: &MemoryBackend, + turbo_tasks: &dyn TurboTasksBackendApi, + ) { self.make_dirty_internal(false, backend, turbo_tasks); } @@ -958,7 +1089,7 @@ impl Task { &self, force_schedule: bool, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { if let TaskType::Once(_) = self.ty { // once task won't become dirty @@ -1024,7 +1155,7 @@ impl Task { pub(crate) fn schedule_when_dirty_from_scope( &self, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { let mut state = self.full_state_mut(); if let TaskStateType::Dirty { ref mut event } = state.state_type { @@ -1046,7 +1177,7 @@ impl Task { id: TaskScopeId, merging_scopes: usize, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, queue: &mut VecDeque, ) { let mut state = self.full_state_mut(); @@ -1127,7 +1258,7 @@ impl Task { id: TaskScopeId, merging_scopes: usize, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { // VecDeque::new() would allocate with 7 items capacity. We don't want that. let mut queue = VecDeque::with_capacity(0); @@ -1141,7 +1272,7 @@ impl Task { state: &mut FullTaskWriteGuard<'_>, id: TaskScopeId, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> bool { let mut schedule_self = false; backend.with_scope(id, |scope| { @@ -1192,7 +1323,7 @@ impl Task { state: &mut TaskMetaStateWriteGuard<'_>, id: TaskScopeId, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { match state { TaskMetaStateWriteGuard::Full(state) => { @@ -1215,7 +1346,7 @@ impl Task { state: &mut FullTaskWriteGuard<'_>, id: TaskScopeId, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { backend.with_scope(id, |scope| { match state.state_type { @@ -1259,7 +1390,7 @@ impl Task { &self, id: TaskScopeId, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, queue: &mut VecDeque, ) { let mut state = self.partial_state_mut(); @@ -1360,7 +1491,7 @@ impl Task { &self, id: TaskScopeId, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { // VecDeque::new() would allocate with 7 items capacity. We don't want that. let mut queue = VecDeque::with_capacity(0); @@ -1374,7 +1505,7 @@ impl Task { &self, id: TaskScopeId, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { self.remove_from_scope_internal(id, backend, turbo_tasks) } @@ -1383,7 +1514,7 @@ impl Task { &self, scopes: impl Iterator, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { for id in scopes { self.remove_from_scope_internal(id, backend, turbo_tasks) @@ -1393,7 +1524,7 @@ impl Task { fn remove_root_or_initial_scope( &self, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { let mut state = self.full_state_mut(); match state.scopes { @@ -1429,7 +1560,7 @@ impl Task { pub fn make_root_scoped( &self, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { let state = self.full_state_mut(); self.make_root_scoped_internal(state, backend, turbo_tasks); @@ -1439,7 +1570,7 @@ impl Task { &self, mut state: FullTaskWriteGuard<'a>, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> Option> { if matches!(state.scopes, TaskScopes::Root(_)) { return Some(state); @@ -1610,7 +1741,7 @@ impl Task { pub(crate) fn invalidate( &self, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { self.make_dirty(backend, turbo_tasks) } @@ -1620,7 +1751,7 @@ impl Task { pub(crate) fn recompute( &self, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { self.make_dirty_internal(true, backend, turbo_tasks) } @@ -1751,9 +1882,13 @@ impl Task { match &self.ty { TaskType::Root(_) => StatsTaskType::Root(self.id), TaskType::Once(_) => StatsTaskType::Once(self.id), - TaskType::ReadCollectibles(_, trait_type_id) => { - StatsTaskType::ReadCollectibles(*trait_type_id) - } + TaskType::ReadTaskCollectibles(box ReadTaskCollectiblesTaskType { + trait_type, .. + }) => StatsTaskType::ReadCollectibles(*trait_type), + TaskType::ReadScopeCollectibles(box ReadScopeCollectiblesTaskType { + trait_type, + .. + }) => StatsTaskType::ReadCollectibles(*trait_type), TaskType::Persistent(ty) => match &**ty { PersistentTaskType::Native(f, _) => StatsTaskType::Native(*f), PersistentTaskType::ResolveNative(f, _) => StatsTaskType::ResolveNative(*f), @@ -1836,9 +1971,19 @@ impl Task { &self, child_id: TaskId, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, + ) { + let state = self.full_state_mut(); + self.connect_child_internal(state, child_id, backend, turbo_tasks); + } + + fn connect_child_internal( + &self, + mut state: FullTaskWriteGuard<'_>, + child_id: TaskId, + backend: &MemoryBackend, + turbo_tasks: &dyn TurboTasksBackendApi, ) { - let mut state = self.full_state_mut(); if state.children.insert(child_id) { if let TaskScopes::Inner(_, optimization_counter) = &state.scopes { if should_optimize_to_root_scoped(*optimization_counter, state.children.len()) { @@ -1882,7 +2027,7 @@ impl Task { &'a self, mut state: FullTaskWriteGuard<'a>, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> FullTaskWriteGuard<'a> { while !state.scopes.is_root() { #[cfg(not(feature = "report_expensive"))] @@ -1923,7 +2068,7 @@ impl Task { func: F, note: impl Fn() -> String + Sync + Send + 'static, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> Result> { let mut state = self.full_state_mut(); if strongly_consistent { @@ -1975,42 +2120,105 @@ impl Task { } } - fn make_read_collectibles_execution_future( + async fn execute_read_task_collectibles( + read_task_id: TaskId, task_id: TaskId, + trait_type_id: TraitTypeId, + turbo_tasks: Arc>, + ) -> Result { + Self::execute_read_collectibles( + read_task_id, + |turbo_tasks| { + let backend = turbo_tasks.backend(); + backend.with_task(task_id, |task| { + let state = + task.ensure_root_scoped(task.full_state_mut(), backend, &*turbo_tasks); + if let TaskScopes::Root(scope_id) = state.scopes { + backend.with_scope(scope_id, |scope| { + scope.read_collectibles_and_children( + scope_id, + trait_type_id, + read_task_id, + ) + }) + } else { + unreachable!(); + } + }) + }, + trait_type_id, + turbo_tasks, + ) + .await + } + + async fn execute_read_scope_collectibles( + read_task_id: TaskId, scope_id: TaskScopeId, trait_type_id: TraitTypeId, - backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, - ) -> Pin> + Send>> { - let (mut current, children) = backend.with_scope(scope_id, |scope| { - scope.read_collectibles_and_children(scope_id, trait_type_id, task_id) - }); - log_scope_update!("reading collectibles from {scope_id}: {:?}", current); + turbo_tasks: Arc>, + ) -> Result { + Self::execute_read_collectibles( + read_task_id, + |turbo_tasks| { + let backend = turbo_tasks.backend(); + backend.with_scope(scope_id, |scope| { + scope.read_collectibles_and_children(scope_id, trait_type_id, read_task_id) + }) + }, + trait_type_id, + turbo_tasks, + ) + .await + } + + async fn execute_read_collectibles( + read_task_id: TaskId, + read_collectibles_and_children: impl Fn( + &dyn TurboTasksBackendApi, + ) -> Result< + (CountHashSet, Vec), + EventListener, + >, + trait_type_id: TraitTypeId, + turbo_tasks: Arc>, + ) -> Result { + let (mut current, children) = loop { + // For performance reasons we only want to read collectibles when there are no + // unfinished tasks anymore. + match read_collectibles_and_children(&*turbo_tasks) { + Ok(r) => break r, + Err(listener) => listener.await, + } + }; + let backend = turbo_tasks.backend(); let children = children .into_iter() - .map(|child| { - let task = backend.get_or_create_read_collectibles_task( - child, - trait_type_id, - task_id, - false, - &*turbo_tasks, - ); - // Safety: RawVcSet is a transparent value - unsafe { - RawVc::TaskOutput(task).into_transparent_read::>() - } + .filter_map(|child| { + backend.with_scope(child, |scope| { + scope.is_propagating_collectibles().then(|| { + let task = backend.get_or_create_read_scope_collectibles_task( + child, + trait_type_id, + read_task_id, + &*turbo_tasks, + ); + // Safety: RawVcSet is a transparent value + unsafe { + RawVc::TaskOutput(task) + .into_transparent_read::>() + } + }) + }) }) - .collect::>(); - Box::pin(async move { - let children = children.into_iter().try_join().await?; - for child in children { - for v in child.iter() { - current.add(*v); - } + .try_join() + .await?; + for child in children { + for v in child.iter() { + current.add(*v); } - Ok(RawVcSetVc::cell(current.iter().copied().collect()).into()) - }) + } + Ok(RawVcSetVc::cell(current.iter().copied().collect()).into()) } pub(crate) fn read_task_collectibles( @@ -2018,22 +2226,15 @@ impl Task { reader: TaskId, trait_id: TraitTypeId, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> RawVcSetVc { - let mut state = self.full_state_mut(); - state = self.ensure_root_scoped(state, backend, turbo_tasks); - if let TaskScopes::Root(scope_id) = state.scopes { - let task = backend.get_or_create_read_collectibles_task( - scope_id, - trait_id, - reader, - true, - turbo_tasks, - ); - RawVc::TaskOutput(task).into() - } else { - panic!("It's not possible to read collectibles from a non-root scope") - } + let task = backend.get_or_create_read_task_collectibles_task( + self.id, + trait_id, + reader, + turbo_tasks, + ); + RawVc::TaskOutput(task).into() } pub(crate) fn emit_collectible( @@ -2041,7 +2242,7 @@ impl Task { trait_type: TraitTypeId, collectible: RawVc, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { let mut state = self.full_state_mut(); if state.collectibles.emit(trait_type, collectible) { @@ -2066,7 +2267,7 @@ impl Task { trait_type: TraitTypeId, collectible: RawVc, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { let mut state = self.full_state_mut(); if state.collectibles.unemit(trait_type, collectible) { @@ -2107,7 +2308,7 @@ impl Task { scope_active_cache: &mut HashMap>, stats: &mut GcStats, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> Option { if !self.is_pure() { stats.no_gc_needed += 1; @@ -2403,7 +2604,7 @@ impl Task { &self, mut full_state: FullTaskWriteGuard<'_>, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> bool { let mut clear_dependencies = None; let TaskState { @@ -2541,6 +2742,17 @@ impl Task { true } + + pub fn get_read_collectibles_task( + &self, + trait_id: TraitTypeId, + create_new: impl FnOnce() -> TaskId, + ) -> TaskId { + let mut state = self.full_state_mut(); + state + .collectibles + .get_read_collectibles_task(trait_id, create_new) + } } fn remove_collectible_from_scopes( @@ -2548,7 +2760,7 @@ fn remove_collectible_from_scopes( unemitted: AutoSet<(TraitTypeId, RawVc)>, task_scopes: &TaskScopes, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { task_scopes.iter().for_each(|id| { backend.with_scope(id, |scope| { @@ -2578,7 +2790,7 @@ fn remove_from_scopes( tasks: AutoSet>, task_scopes: &TaskScopes, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { match task_scopes { TaskScopes::Root(scope) => { @@ -2604,7 +2816,7 @@ pub fn run_add_to_scope_queue( id: TaskScopeId, merging_scopes: usize, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { while let Some(child) = queue.pop_front() { backend.with_task(child, |child| { @@ -2635,7 +2847,7 @@ pub fn run_remove_from_scope_queue( mut queue: VecDeque, id: TaskScopeId, backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) { while let Some(child) = queue.pop_front() { backend.with_task(child, |child| { diff --git a/crates/turbo-tasks-memory/tests/collectibles.rs b/crates/turbo-tasks-memory/tests/collectibles.rs index 6de54f855c0ea9..c4223ee3072e44 100644 --- a/crates/turbo-tasks-memory/tests/collectibles.rs +++ b/crates/turbo-tasks-memory/tests/collectibles.rs @@ -1,23 +1,35 @@ #![feature(min_specialization)] -use std::collections::HashSet; +use std::{collections::HashSet, time::Duration}; -use anyhow::Result; +use anyhow::{bail, Result}; +use tokio::time::sleep; use turbo_tasks::{emit, primitives::StringVc, CollectiblesSource, ValueToString, ValueToStringVc}; use turbo_tasks_testing::{register, run}; register!(); +#[tokio::test] +async fn emitting() { + run! { + let result = my_emitting_function_with_result(""); + let list = result.peek_collectibles::().strongly_consistent().await?; + assert_eq!(list.len(), 1); + assert_eq!(list.into_iter().next().unwrap().to_string().await?.as_str(), "123"); + assert_eq!(result.strongly_consistent().await?.0, 42); + } +} + #[tokio::test] async fn transitive_emitting() { run! { let result = my_transitive_emitting_function("", ""); - let list = result.peek_collectibles::().await?; + let list = result.peek_collectibles::().strongly_consistent().await?; assert_eq!(list.len(), 2); let mut expected = ["123", "42"].into_iter().collect::>(); for collectible in list { assert!(expected.remove(collectible.to_string().await?.as_str())) } - assert_eq!(result.await?.0, 0); + assert_eq!(result.strongly_consistent().await?.0, 0); } } @@ -25,13 +37,13 @@ async fn transitive_emitting() { async fn multi_emitting() { run! { let result = my_multi_emitting_function(); - let list = result.peek_collectibles::().await?; + let list = result.peek_collectibles::().strongly_consistent().await?; assert_eq!(list.len(), 2); let mut expected = ["123", "42"].into_iter().collect::>(); for collectible in list { assert!(expected.remove(collectible.to_string().await?.as_str())) } - assert_eq!(result.await?.0, 0); + assert_eq!(result.strongly_consistent().await?.0, 0); } } @@ -39,11 +51,11 @@ async fn multi_emitting() { async fn taking_collectibles() { run! { let result = my_collecting_function(); - let list = result.take_collectibles::().await?; + let list = result.take_collectibles::().strongly_consistent().await?; // my_collecting_function already processed the collectibles so the list should // be empty assert!(list.is_empty()); - assert_eq!(result.await?.0, 0); + assert_eq!(result.strongly_consistent().await?.0, 0); } } @@ -51,11 +63,11 @@ async fn taking_collectibles() { async fn taking_collectibles_extra_layer() { run! { let result = my_collecting_function_indirect(); - let list = result.take_collectibles::().await?; + let list = result.take_collectibles::().strongly_consistent().await?; // my_collecting_function already processed the collectibles so the list should // be empty assert!(list.is_empty()); - assert_eq!(result.await?.0, 0); + assert_eq!(result.strongly_consistent().await?.0, 0); } } @@ -63,36 +75,39 @@ async fn taking_collectibles_extra_layer() { async fn taking_collectibles_parallel() { run! { let result = my_transitive_emitting_function("", "a"); - let list = result.take_collectibles::().await?; + let list = result.take_collectibles::().strongly_consistent().await?; assert_eq!(list.len(), 2); - assert_eq!(result.await?.0, 0); + assert_eq!(result.strongly_consistent().await?.0, 0); let result = my_transitive_emitting_function("", "b"); - let list = result.take_collectibles::().await?; + let list = result.take_collectibles::().strongly_consistent().await?; assert_eq!(list.len(), 2); - assert_eq!(result.await?.0, 0); + assert_eq!(result.strongly_consistent().await?.0, 0); let result = my_transitive_emitting_function_with_child_scope("", "b", "1"); - let list = result.take_collectibles::().await?; + let list = result.take_collectibles::().strongly_consistent().await?; assert_eq!(list.len(), 2); - assert_eq!(result.await?.0, 0); + assert_eq!(result.strongly_consistent().await?.0, 0); let result = my_transitive_emitting_function_with_child_scope("", "b", "2"); - let list = result.take_collectibles::().await?; + let list = result.take_collectibles::().strongly_consistent().await?; assert_eq!(list.len(), 2); - assert_eq!(result.await?.0, 0); + assert_eq!(result.strongly_consistent().await?.0, 0); let result = my_transitive_emitting_function_with_child_scope("", "c", "3"); - let list = result.take_collectibles::().await?; + let list = result.take_collectibles::().strongly_consistent().await?; assert_eq!(list.len(), 2); - assert_eq!(result.await?.0, 0); + assert_eq!(result.strongly_consistent().await?.0, 0); } } #[turbo_tasks::function] async fn my_collecting_function() -> Result { let result = my_transitive_emitting_function("", ""); - result.take_collectibles::().await?; + let list = result.take_collectibles::().await?; + if list.len() != 2 { + bail!("Expected 2 collectibles, got {}", list.len()); + } Ok(result) } @@ -102,7 +117,9 @@ async fn my_collecting_function_indirect() -> Result { let list = result.peek_collectibles::().await?; // my_collecting_function already processed the collectibles so the list should // be empty - assert!(list.is_empty()); + if !list.is_empty() { + bail!("Expected 0 collectibles, got {}", list.len()); + } Ok(result) } @@ -134,11 +151,21 @@ async fn my_transitive_emitting_function_with_child_scope( #[turbo_tasks::function] async fn my_emitting_function(_key: &str) -> Result<()> { + sleep(Duration::from_millis(100)).await; emit(ThingVc::new(123).as_value_to_string()); emit(ThingVc::new(42).as_value_to_string()); Ok(()) } +#[turbo_tasks::function] +async fn my_emitting_function_with_result(_key: &str) -> Result { + sleep(Duration::from_millis(100)).await; + println!("emitting"); + emit(ThingVc::new(123).as_value_to_string()); + println!("emitted"); + Ok(ThingVc::new(42)) +} + #[turbo_tasks::value(shared)] struct Thing(u32); diff --git a/crates/turbo-tasks-testing/src/lib.rs b/crates/turbo-tasks-testing/src/lib.rs index 0bcd8788d78aee..342247d4938178 100644 --- a/crates/turbo-tasks-testing/src/lib.rs +++ b/crates/turbo-tasks-testing/src/lib.rs @@ -200,6 +200,10 @@ impl TurboTasksApi for VcStorage { let cell = map.entry((task, index)).or_default(); *cell = content; } + + fn connect_task(&self, _task: TaskId) { + // no-op + } } impl VcStorage { diff --git a/crates/turbo-tasks/src/backend.rs b/crates/turbo-tasks/src/backend.rs index ecd039b8586269..002660eed4d608 100644 --- a/crates/turbo-tasks/src/backend.rs +++ b/crates/turbo-tasks/src/backend.rs @@ -175,17 +175,17 @@ pub trait Backend: Sync + Send { fn initialize(&mut self, task_id_provider: &dyn TaskIdProvider) {} #[allow(unused_variables)] - fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi) {} + fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi) {} #[allow(unused_variables)] - fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi) {} + fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi) {} #[allow(unused_variables)] - fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi) {} + fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi) {} - fn invalidate_task(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi); + fn invalidate_task(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi); - fn invalidate_tasks(&self, tasks: Vec, turbo_tasks: &dyn TurboTasksBackendApi); + fn invalidate_tasks(&self, tasks: Vec, turbo_tasks: &dyn TurboTasksBackendApi); fn get_task_description(&self, task: TaskId) -> String; @@ -202,14 +202,14 @@ pub trait Backend: Sync + Send { fn try_start_task_execution( &self, task: TaskId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> Option; fn task_execution_result( &self, task: TaskId, result: Result, Option>>, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ); fn task_execution_completed( @@ -218,13 +218,13 @@ pub trait Backend: Sync + Send { duration: Duration, instant: Instant, stateful: bool, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> bool; fn run_backend_job<'a>( &'a self, id: BackendJobId, - turbo_tasks: &'a dyn TurboTasksBackendApi, + turbo_tasks: &'a dyn TurboTasksBackendApi, ) -> Pin + Send + 'a>>; fn try_read_task_output( @@ -232,7 +232,7 @@ pub trait Backend: Sync + Send { task: TaskId, reader: TaskId, strongly_consistent: bool, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> Result>; /// INVALIDATION: Be careful with this, it will not track dependencies, so @@ -241,7 +241,7 @@ pub trait Backend: Sync + Send { &self, task: TaskId, strongly_consistent: bool, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> Result>; fn try_read_task_cell( @@ -249,7 +249,7 @@ pub trait Backend: Sync + Send { task: TaskId, index: CellId, reader: TaskId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> Result>; /// INVALIDATION: Be careful with this, it will not track dependencies, so @@ -258,7 +258,7 @@ pub trait Backend: Sync + Send { &self, task: TaskId, index: CellId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> Result>; /// INVALIDATION: Be careful with this, it will not track dependencies, so @@ -267,7 +267,7 @@ pub trait Backend: Sync + Send { &self, current_task: TaskId, index: CellId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> Result { match self.try_read_task_cell_untracked(current_task, index, turbo_tasks)? { Ok(content) => Ok(content), @@ -280,7 +280,7 @@ pub trait Backend: Sync + Send { task: TaskId, trait_id: TraitTypeId, reader: TaskId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> RawVcSetVc; fn emit_collectible( @@ -288,7 +288,7 @@ pub trait Backend: Sync + Send { trait_type: TraitTypeId, collectible: RawVc, task: TaskId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ); fn unemit_collectible( @@ -296,7 +296,7 @@ pub trait Backend: Sync + Send { trait_type: TraitTypeId, collectible: RawVc, task: TaskId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ); fn update_task_cell( @@ -304,28 +304,35 @@ pub trait Backend: Sync + Send { task: TaskId, index: CellId, content: CellContent, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ); fn get_or_create_persistent_task( &self, task_type: PersistentTaskType, parent_task: TaskId, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> TaskId; + fn connect_task( + &self, + task: TaskId, + parent_task: TaskId, + turbo_tasks: &dyn TurboTasksBackendApi, + ); + fn create_transient_task( &self, task_type: TransientTaskType, - turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> TaskId; } impl PersistentTaskType { - pub async fn run_resolve_native( + pub async fn run_resolve_native( fn_id: FunctionId, inputs: Vec, - turbo_tasks: Arc, + turbo_tasks: Arc>, ) -> Result { let mut resolved_inputs = Vec::with_capacity(inputs.len()); for input in inputs.into_iter() { @@ -334,11 +341,11 @@ impl PersistentTaskType { Ok(turbo_tasks.native_call(fn_id, resolved_inputs)) } - pub async fn run_resolve_trait( + pub async fn run_resolve_trait( trait_type: TraitTypeId, name: Cow<'static, str>, inputs: Vec, - turbo_tasks: Arc, + turbo_tasks: Arc>, ) -> Result { let mut resolved_inputs = Vec::with_capacity(inputs.len()); let mut iter = inputs.into_iter(); @@ -381,9 +388,9 @@ impl PersistentTaskType { } } - pub fn run( + pub fn run( self, - turbo_tasks: Arc, + turbo_tasks: Arc>, ) -> Pin> + Send>> { match self { PersistentTaskType::Native(fn_id, inputs) => { diff --git a/crates/turbo-tasks/src/manager.rs b/crates/turbo-tasks/src/manager.rs index d244268e256744..7c50ceff482f78 100644 --- a/crates/turbo-tasks/src/manager.rs +++ b/crates/turbo-tasks/src/manager.rs @@ -108,6 +108,8 @@ pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send { fn read_current_task_cell(&self, index: CellId) -> Result; fn update_current_task_cell(&self, index: CellId, content: CellContent); + + fn connect_task(&self, task: TaskId); } /// The type of stats reporting. @@ -168,8 +170,10 @@ impl Unused { } } -pub trait TurboTasksBackendApi: TaskIdProvider + TurboTasksCallApi + Sync + Send { - fn pin(&self) -> Arc; +pub trait TurboTasksBackendApi: + TaskIdProvider + TurboTasksCallApi + Sync + Send +{ + fn pin(&self) -> Arc>; fn schedule(&self, task: TaskId); fn schedule_backend_background_job(&self, id: BackendJobId); @@ -194,6 +198,8 @@ pub trait TurboTasksBackendApi: TaskIdProvider + TurboTasksCallApi + Sync + Send fn set_stats_type(&self, stats_type: StatsType); /// Returns the duration from the start of the program to the given instant. fn program_duration_until(&self, instant: Instant) -> Duration; + /// Returns a reference to the backend. + fn backend(&self) -> &B; } impl StatsType { @@ -208,7 +214,7 @@ impl StatsType { } } -impl TaskIdProvider for &dyn TurboTasksBackendApi { +impl TaskIdProvider for &dyn TurboTasksBackendApi { fn get_fresh_task_id(&self) -> Unused { (*self).get_fresh_task_id() } @@ -272,7 +278,7 @@ task_local! { static CURRENT_TASK_STATE: RefCell; } -impl TurboTasks { +impl TurboTasks { // TODO better lifetime management for turbo tasks // consider using unsafe for the task_local turbo tasks // that should be safe as long tasks can't outlife turbo task @@ -726,7 +732,7 @@ impl TurboTasks { } } -impl TurboTasksCallApi for TurboTasks { +impl TurboTasksCallApi for TurboTasks { fn dynamic_call(&self, func: FunctionId, inputs: Vec) -> RawVc { self.dynamic_call(func, inputs) } @@ -768,7 +774,7 @@ impl TurboTasksCallApi for TurboTasks { } } -impl TurboTasksApi for TurboTasks { +impl TurboTasksApi for TurboTasks { fn invalidate(&self, task: TaskId) { self.backend.invalidate_task(task, self); } @@ -885,12 +891,20 @@ impl TurboTasksApi for TurboTasks { self, ); } + + fn connect_task(&self, task: TaskId) { + self.backend + .connect_task(task, current_task("connecting task"), self); + } } -impl TurboTasksBackendApi for TurboTasks { - fn pin(&self) -> Arc { +impl TurboTasksBackendApi for TurboTasks { + fn pin(&self) -> Arc> { self.pin() } + fn backend(&self) -> &B { + &self.backend + } #[track_caller] fn schedule_backend_background_job(&self, id: BackendJobId) { self.schedule_background_job(move |this| async move { @@ -993,7 +1007,7 @@ impl TurboTasksBackendApi for TurboTasks { } } -impl TaskIdProvider for TurboTasks { +impl TaskIdProvider for TurboTasks { fn get_fresh_task_id(&self) -> Unused { // Safety: This is a fresh id from the factory unsafe { Unused::new_unchecked(self.task_id_factory.get()) } diff --git a/crates/turbo-tasks/src/raw_vc.rs b/crates/turbo-tasks/src/raw_vc.rs index 509955726d7cbf..a3fbf077150d9a 100644 --- a/crates/turbo-tasks/src/raw_vc.rs +++ b/crates/turbo-tasks/src/raw_vc.rs @@ -1,7 +1,7 @@ use std::{ any::Any, fmt::{Debug, Display}, - future::Future, + future::{Future, IntoFuture}, hash::Hash, marker::PhantomData, pin::Pin, @@ -284,6 +284,11 @@ impl RawVc { } } + pub fn connect(&self) { + let tt = turbo_tasks(); + tt.connect_task(self.get_task_id()); + } + pub fn is_resolved(&self) -> bool { match self { RawVc::TaskOutput(_) => false, @@ -338,7 +343,7 @@ impl CollectiblesSource for RawVc { let set: RawVcSetVc = tt.read_task_collectibles(self.get_task_id(), T::get_trait_type_id()); CollectiblesFuture { turbo_tasks: tt, - inner: set.strongly_consistent(), + inner: set.into_future(), take: false, phantom: PhantomData, } @@ -350,7 +355,7 @@ impl CollectiblesSource for RawVc { let set: RawVcSetVc = tt.read_task_collectibles(self.get_task_id(), T::get_trait_type_id()); CollectiblesFuture { turbo_tasks: tt, - inner: set.strongly_consistent(), + inner: set.into_future(), take: true, phantom: PhantomData, } @@ -501,6 +506,13 @@ pub struct CollectiblesFuture { phantom: PhantomData T>, } +impl CollectiblesFuture { + pub fn strongly_consistent(mut self) -> Self { + self.inner.strongly_consistent = true; + self + } +} + impl Future for CollectiblesFuture { type Output = Result>; diff --git a/crates/turbopack-core/src/issue/mod.rs b/crates/turbopack-core/src/issue/mod.rs index 1ab17a3d38046a..17e425e1510c50 100644 --- a/crates/turbopack-core/src/issue/mod.rs +++ b/crates/turbopack-core/src/issue/mod.rs @@ -301,11 +301,11 @@ impl IssueVc { source: T, ) -> Result { Ok(CapturedIssuesVc::cell(CapturedIssues { - issues: source.peek_collectibles().await?, + issues: source.peek_collectibles().strongly_consistent().await?, #[cfg(feature = "issue_path")] processing_path: ItemIssueProcessingPathVc::cell(ItemIssueProcessingPath( None, - source.peek_collectibles().await?, + source.peek_collectibles().strongly_consistent().await?, )), })) } @@ -318,11 +318,11 @@ impl IssueVc { source: T, ) -> Result { Ok(CapturedIssuesVc::cell(CapturedIssues { - issues: source.take_collectibles().await?, + issues: source.take_collectibles().strongly_consistent().await?, #[cfg(feature = "issue_path")] processing_path: ItemIssueProcessingPathVc::cell(ItemIssueProcessingPath( None, - source.take_collectibles().await?, + source.take_collectibles().strongly_consistent().await?, )), })) }