diff --git a/crates/turbo-tasks-memory/Cargo.toml b/crates/turbo-tasks-memory/Cargo.toml index 00710f11b4092..ab918a4e4dce8 100644 --- a/crates/turbo-tasks-memory/Cargo.toml +++ b/crates/turbo-tasks-memory/Cargo.toml @@ -44,6 +44,8 @@ log_connect_tasks = [] report_expensive = [] print_scope_updates = [] print_task_invalidation = [] +inline_add_to_scope = [] +inline_remove_from_scope = [] [[bench]] name = "mod" diff --git a/crates/turbo-tasks-memory/src/count_hash_set.rs b/crates/turbo-tasks-memory/src/count_hash_set.rs index a805e86191471..42c4b8ec345bb 100644 --- a/crates/turbo-tasks-memory/src/count_hash_set.rs +++ b/crates/turbo-tasks-memory/src/count_hash_set.rs @@ -107,6 +107,11 @@ impl CountHashSet { self.add_count(item, 1) } + /// Returns the current count of an item + pub fn get(&self, item: &T) -> isize { + *self.inner.get(item).unwrap_or(&0) + } + /// Returns true when the value is no longer visible from outside pub fn remove_count(&mut self, item: T, count: usize) -> bool { match self.inner.entry(item) { diff --git a/crates/turbo-tasks-memory/src/memory_backend.rs b/crates/turbo-tasks-memory/src/memory_backend.rs index c4c01192ee085..5c058015e0d84 100644 --- a/crates/turbo-tasks-memory/src/memory_backend.rs +++ b/crates/turbo-tasks-memory/src/memory_backend.rs @@ -1,10 +1,10 @@ use std::{ - borrow::Cow, + borrow::{Borrow, Cow}, cell::RefCell, cmp::min, collections::VecDeque, future::Future, - hash::BuildHasherDefault, + hash::{BuildHasher, BuildHasherDefault, Hash}, pin::Pin, sync::{ atomic::{AtomicBool, Ordering}, @@ -24,8 +24,9 @@ use turbo_tasks::{ TransientTaskType, }, event::EventListener, + primitives::RawVcSetVc, util::{IdFactory, NoMoveVec}, - CellId, RawVc, TaskId, TraitTypeId, TurboTasksBackendApi, + CellId, RawVc, TaskId, TraitTypeId, TurboTasksBackendApi, Unused, }; use crate::{ @@ -48,6 +49,8 @@ pub struct MemoryBackend { backend_jobs: NoMoveVec, backend_job_id_factory: IdFactory, task_cache: DashMap, TaskId, BuildHasherDefault>, + read_collectibles_task_cache: + DashMap<(TaskScopeId, TraitTypeId), TaskId, BuildHasherDefault>, memory_limit: usize, gc_queue: Option, idle_gc_active: AtomicBool, @@ -76,6 +79,7 @@ impl MemoryBackend { backend_jobs: NoMoveVec::new(), backend_job_id_factory: IdFactory::new(), task_cache: DashMap::default(), + read_collectibles_task_cache: DashMap::default(), memory_limit, gc_queue: (memory_limit != usize::MAX).then(GcQueue::new), idle_gc_active: AtomicBool::new(false), @@ -121,12 +125,22 @@ impl MemoryBackend { for id in self.task_cache.clone().into_read_only().values() { func(*id); } + for id in self + .read_collectibles_task_cache + .clone() + .into_read_only() + .values() + { + func(*id); + } } + #[inline(always)] pub fn with_task(&self, id: TaskId, func: impl FnOnce(&Task) -> T) -> T { func(self.memory_tasks.get(*id).unwrap()) } + #[inline(always)] pub fn with_scope(&self, id: TaskScopeId, func: impl FnOnce(&TaskScope) -> T) -> T { func(self.memory_task_scopes.get(*id).unwrap()) } @@ -261,6 +275,99 @@ impl MemoryBackend { } } } + + pub(crate) fn get_or_create_read_collectibles_task( + &self, + scope_id: TaskScopeId, + trait_type: TraitTypeId, + parent_task: TaskId, + root_scoped: bool, + turbo_tasks: &dyn TurboTasksBackendApi, + ) -> TaskId { + if let Some(task) = self.lookup_and_connect_task( + parent_task, + &self.read_collectibles_task_cache, + &(scope_id, trait_type), + turbo_tasks, + ) { + // fast pass without creating a new task + task + } else { + // slow pass with key lock + let id = turbo_tasks.get_fresh_task_id(); + let task = Task::new_read_collectibles( + // Safety: That task will hold the value, but we are still in + // control of the task + *unsafe { id.get_unchecked() }, + scope_id, + trait_type, + turbo_tasks.stats_type(), + ); + self.insert_and_connect_fresh_task( + parent_task, + &self.read_collectibles_task_cache, + (scope_id, trait_type), + id, + task, + root_scoped, + turbo_tasks, + ) + } + } + + fn insert_and_connect_fresh_task( + &self, + parent_task: TaskId, + task_cache: &DashMap, + key: K, + new_id: Unused, + task: Task, + root_scoped: bool, + turbo_tasks: &dyn TurboTasksBackendApi, + ) -> TaskId { + let new_id = new_id.into(); + // Safety: We have a fresh task id that nobody knows about yet + let task = unsafe { self.memory_tasks.insert(*new_id, task) }; + if root_scoped { + task.make_root_scoped(self, turbo_tasks); + } + let result_task = match task_cache.entry(key) { + Entry::Vacant(entry) => { + // This is the most likely case + entry.insert(new_id); + new_id + } + Entry::Occupied(entry) => { + // Safety: We have a fresh task id that nobody knows about yet + unsafe { + self.memory_tasks.remove(*new_id); + let new_id = Unused::new_unchecked(new_id); + turbo_tasks.reuse_task_id(new_id); + } + *entry.get() + } + }; + self.connect_task_child(parent_task, result_task, turbo_tasks); + result_task + } + + fn lookup_and_connect_task( + &self, + parent_task: TaskId, + task_cache: &DashMap, + key: &Q, + turbo_tasks: &dyn TurboTasksBackendApi, + ) -> Option + where + K: Borrow, + Q: Hash + Eq + ?Sized, + { + task_cache.get(key).map(|task| { + self.connect_task_child(parent_task, *task, turbo_tasks); + + *task + }) + } } impl Backend for MemoryBackend { @@ -445,15 +552,15 @@ impl Backend for MemoryBackend { }) } - fn try_read_task_collectibles( + fn read_task_collectibles( &self, id: TaskId, trait_id: TraitTypeId, reader: TaskId, turbo_tasks: &dyn TurboTasksBackendApi, - ) -> Result, EventListener>> { + ) -> RawVcSetVc { self.with_task(id, |task| { - task.try_read_task_collectibles(reader, trait_id, self, turbo_tasks) + task.read_task_collectibles(reader, trait_id, self, turbo_tasks) }) } @@ -519,12 +626,10 @@ impl Backend for MemoryBackend { parent_task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi, ) -> TaskId { - let result = if let Some(task) = self.task_cache.get(&task_type).map(|task| *task) { + if let Some(task) = + self.lookup_and_connect_task(parent_task, &self.task_cache, &task_type, turbo_tasks) + { // fast pass without creating a new task - self.connect_task_child(parent_task, task, turbo_tasks); - - // TODO maybe force (background) scheduling to avoid inactive tasks hanging in - // "in progress" until they become active task } else { // It's important to avoid overallocating memory as this will go into the task @@ -533,30 +638,23 @@ impl Backend for MemoryBackend { let task_type = Arc::new(task_type); // slow pass with key lock let id = turbo_tasks.get_fresh_task_id(); - let task = Task::new_persistent(id, task_type.clone(), turbo_tasks.stats_type()); - // Safety: We have a fresh task id that nobody knows about yet - unsafe { - self.memory_tasks.insert(*id, task); - } - let result_task = match self.task_cache.entry(task_type) { - Entry::Vacant(entry) => { - // This is the most likely case - entry.insert(id); - id - } - Entry::Occupied(entry) => { - // Safety: We have a fresh task id that nobody knows about yet - unsafe { - self.memory_tasks.remove(*id); - turbo_tasks.reuse_task_id(id); - } - *entry.get() - } - }; - self.connect_task_child(parent_task, result_task, turbo_tasks); - result_task - }; - result + let task = Task::new_persistent( + // Safety: That task will hold the value, but we are still in + // control of the task + *unsafe { id.get_unchecked() }, + task_type.clone(), + turbo_tasks.stats_type(), + ); + self.insert_and_connect_fresh_task( + parent_task, + &self.task_cache, + task_type, + id, + task, + false, + turbo_tasks, + ) + } } fn create_transient_task( @@ -572,6 +670,7 @@ impl Backend for MemoryBackend { scope.increment_unfinished_tasks(self); }); let stats_type = turbo_tasks.stats_type(); + let id = id.into(); let task = match task_type { TransientTaskType::Root(f) => Task::new_root(id, scope, move || f() as _, stats_type), TransientTaskType::Once(f) => Task::new_once(id, scope, f, stats_type), @@ -591,7 +690,13 @@ pub(crate) enum Job { ScheduleWhenDirtyFromScope(AutoSet), /// Add tasks from a scope. Scheduled by `run_add_from_scope_queue` to /// split off work. - AddToScopeQueue(VecDeque<(TaskId, usize)>, TaskScopeId, bool), + AddToScopeQueue { + queue: VecDeque, + scope: TaskScopeId, + /// Number of scopes that are currently being merged into this scope. + /// This information is only used for optimization. + merging_scopes: usize, + }, /// Remove tasks from a scope. Scheduled by `run_remove_from_scope_queue` to /// split off work. RemoveFromScopeQueue(VecDeque, TaskScopeId), @@ -618,7 +723,7 @@ impl Job { Job::RemoveFromScopes(tasks, scopes) => { for task in tasks { backend.with_task(task, |task| { - task.remove_from_scopes(scopes.iter().cloned(), backend, turbo_tasks) + task.remove_from_scopes(scopes.iter().copied(), backend, turbo_tasks) }); } backend.scope_add_remove_priority.finish_high(); @@ -638,17 +743,15 @@ impl Job { }) } } - Job::AddToScopeQueue(queue, id, is_optimization_scope) => { + Job::AddToScopeQueue { + queue, + scope, + merging_scopes, + } => { backend .scope_add_remove_priority .run_low(async { - run_add_to_scope_queue( - queue, - id, - is_optimization_scope, - backend, - turbo_tasks, - ); + run_add_to_scope_queue(queue, scope, merging_scopes, backend, turbo_tasks); }) .await; } diff --git a/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs b/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs index 19f0092b29079..c30f7a731dbab 100644 --- a/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs +++ b/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs @@ -26,8 +26,9 @@ use turbo_tasks::{ ActivateResult, DeactivateResult, PersistResult, PersistTaskState, PersistedGraph, PersistedGraphApi, ReadTaskState, TaskCell, TaskData, }, + primitives::RawVcSetVc, util::{IdFactory, NoMoveVec, SharedError}, - CellId, RawVc, TaskId, TraitTypeId, TurboTasksBackendApi, + CellId, RawVc, TaskId, TraitTypeId, TurboTasksBackendApi, Unused, }; type RootTaskFn = @@ -1395,13 +1396,13 @@ impl Backend for MemoryBackendWithPersistedGraph

{ } } - fn try_read_task_collectibles( + fn read_task_collectibles( &self, _task: TaskId, _trait_id: TraitTypeId, _reader: TaskId, _turbo_tasks: &dyn TurboTasksBackendApi, - ) -> Result, EventListener>> { + ) -> RawVcSetVc { todo!() } @@ -1484,6 +1485,7 @@ impl Backend for MemoryBackendWithPersistedGraph

{ } let task = turbo_tasks.get_fresh_task_id(); + let task = task.into(); let new_task = Task { active_parents: AtomicU32::new(1), task_state: Mutex::new(TaskState { @@ -1505,6 +1507,7 @@ impl Backend for MemoryBackendWithPersistedGraph

{ // SAFETY: We are still the only owner of this task and id unsafe { self.tasks.remove(*task); + let task = Unused::new_unchecked(task); turbo_tasks.reuse_task_id(task); } self.connect(parent_task, existing_task, turbo_tasks); @@ -1528,6 +1531,7 @@ impl Backend for MemoryBackendWithPersistedGraph

{ turbo_tasks: &dyn TurboTasksBackendApi, ) -> TaskId { let task = turbo_tasks.get_fresh_task_id(); + let task = task.into(); let new_task = Task { active_parents: AtomicU32::new(1), task_state: Mutex::new(TaskState { @@ -1567,6 +1571,7 @@ impl<'a, P: PersistedGraph> PersistedGraphApi for MemoryBackendPersistedGraphApi task_type: TaskType::Persistent(task_type.clone()), }; let task = self.turbo_tasks.get_fresh_task_id(); + let task = task.into(); // SAFETY: It's a fresh task id unsafe { self.backend.tasks.insert(*task, new_task); @@ -1574,7 +1579,9 @@ impl<'a, P: PersistedGraph> PersistedGraphApi for MemoryBackendPersistedGraphApi match cache.entry(task_type) { Entry::Occupied(e) => { let value = *e.into_ref(); + // Safety: We didn't store the task id in the cache, we it's still unused unsafe { + let task = Unused::new_unchecked(task); self.turbo_tasks.reuse_task_id(task); } value diff --git a/crates/turbo-tasks-memory/src/scope.rs b/crates/turbo-tasks-memory/src/scope.rs index 65c182a48db92..06ffde390ebb2 100644 --- a/crates/turbo-tasks-memory/src/scope.rs +++ b/crates/turbo-tasks-memory/src/scope.rs @@ -1,5 +1,4 @@ use std::{ - collections::HashMap, fmt::{Debug, Display}, hash::Hash, mem::take, @@ -173,7 +172,7 @@ impl TaskScope { children: CountHashSet::new(), collectibles: AutoMap::new(), dependent_tasks: AutoSet::new(), - event: Event::new(|| { + event: Event::new(move || { #[cfg(feature = "print_scope_updates")] return format!("TaskScope({id})::event"); #[cfg(not(feature = "print_scope_updates"))] @@ -200,7 +199,7 @@ impl TaskScope { children: CountHashSet::new(), collectibles: AutoMap::new(), dependent_tasks: AutoSet::new(), - event: Event::new(|| { + event: Event::new(move || { #[cfg(feature = "print_scope_updates")] return format!("TaskScope({id})::event"); #[cfg(not(feature = "print_scope_updates"))] @@ -325,38 +324,18 @@ impl TaskScope { } } - pub fn read_collectibles( + pub fn read_collectibles_and_children( &self, self_id: TaskScopeId, trait_id: TraitTypeId, reader: TaskId, - backend: &MemoryBackend, - ) -> AutoSet { - let collectibles = self.read_collectibles_recursive( - self_id, - trait_id, - reader, - backend, - &mut HashMap::with_hasher(BuildNoHashHasher::default()), - ); - AutoSet::from_iter(collectibles.iter().copied()) - } - - fn read_collectibles_recursive( - &self, - self_id: TaskScopeId, - trait_id: TraitTypeId, - reader: TaskId, - backend: &MemoryBackend, - cache: &mut HashMap, BuildNoHashHasher>, - ) -> CountHashSet { - // TODO add reverse edges from task to scopes and (scope, trait_id) + ) -> (CountHashSet, Vec) { let mut state = self.state.lock(); let children = state.children.iter().copied().collect::>(); state.dependent_tasks.insert(reader); Task::add_dependency_to_current(TaskDependency::ScopeChildren(self_id)); - let mut current = { + let current = { let (c, dependent_tasks) = state.collectibles.entry(trait_id).or_default(); dependent_tasks.insert(reader); Task::add_dependency_to_current(TaskDependency::ScopeCollectibles(self_id, trait_id)); @@ -364,22 +343,7 @@ impl TaskScope { }; drop(state); - for id in children { - backend.with_scope(id, |scope| { - let child = if let Some(cached) = cache.get(&id) { - cached - } else { - let child = - scope.read_collectibles_recursive(id, trait_id, reader, backend, cache); - cache.entry(id).or_insert(child) - }; - for v in child.iter() { - current.add(*v); - } - }) - } - - current + (current, children) } pub(crate) fn remove_dependent_task(&self, reader: TaskId) { @@ -681,12 +645,21 @@ impl TaskScopeState { match self.collectibles.entry(trait_id) { Entry::Occupied(mut entry) => { let (collectibles, dependent_tasks) = entry.get_mut(); - if collectibles.remove_count(collectible, count) { + let old_value = collectibles.get(&collectible); + let new_value = old_value - count as isize; + // NOTE: The read_collectibles need to be invalidated when negative count + // changes. Each negative count will eliminate one child scope emitted + // collectible. So changing from -1 to -2 might affect the visible collectibles. + if collectibles.remove_count(collectible, count) || new_value < 0 { let notify = take(dependent_tasks); if collectibles.is_unset() { entry.remove(); } - log_scope_update!("remove_collectible {} -> {}", *self.id, collectible); + log_scope_update!( + "remove_collectible {} -> {} ({old_value} -> {new_value})", + *self.id, + collectible + ); Some(ScopeCollectibleChangeEffect { notify }) } else { None diff --git a/crates/turbo-tasks-memory/src/stats.rs b/crates/turbo-tasks-memory/src/stats.rs index 215d65b5451c6..a62853736b38d 100644 --- a/crates/turbo-tasks-memory/src/stats.rs +++ b/crates/turbo-tasks-memory/src/stats.rs @@ -23,6 +23,7 @@ pub struct StatsReferences { pub enum StatsTaskType { Root(TaskId), Once(TaskId), + ReadCollectibles(TraitTypeId), Native(FunctionId), ResolveNative(FunctionId), ResolveTrait(TraitTypeId, String), @@ -33,6 +34,9 @@ impl Display for StatsTaskType { match self { StatsTaskType::Root(_) => write!(f, "root"), StatsTaskType::Once(_) => write!(f, "once"), + StatsTaskType::ReadCollectibles(t) => { + write!(f, "read collectibles {}", registry::get_trait(*t).name) + } StatsTaskType::Native(nf) => write!(f, "{}", registry::get_function(*nf).name), StatsTaskType::ResolveNative(nf) => { write!(f, "resolve {}", registry::get_function(*nf).name) @@ -184,7 +188,10 @@ impl Stats { pub fn merge_resolve(&mut self) { self.merge(|ty, _stats| match ty { - StatsTaskType::Root(_) | StatsTaskType::Once(_) | StatsTaskType::Native(_) => false, + StatsTaskType::Root(_) + | StatsTaskType::Once(_) + | StatsTaskType::Native(_) + | StatsTaskType::ReadCollectibles(..) => false, StatsTaskType::ResolveNative(_) | StatsTaskType::ResolveTrait(_, _) => true, }) } diff --git a/crates/turbo-tasks-memory/src/task.rs b/crates/turbo-tasks-memory/src/task.rs index 418ce6f53394b..be60432896950 100644 --- a/crates/turbo-tasks-memory/src/task.rs +++ b/crates/turbo-tasks-memory/src/task.rs @@ -24,7 +24,9 @@ use tokio::task_local; use turbo_tasks::{ backend::{PersistentTaskType, TaskExecutionSpec}, event::{Event, EventListener}, - get_invalidator, registry, CellId, Invalidator, RawVc, StatsType, TaskId, TraitTypeId, + get_invalidator, + primitives::{RawVcSet, RawVcSetVc}, + registry, CellId, Invalidator, RawVc, StatsType, TaskId, TraitTypeId, TryJoinIterExt, TurboTasksBackendApi, ValueTypeId, }; @@ -80,15 +82,43 @@ enum TaskType { /// applied. Once(OnceTaskFn), + /// A task that reads all collectibles of a certain trait from a + /// [TaskScope]. It will do that by recursively calling ReadCollectibles on + /// child scopes, so that results by scope are cached. + ReadCollectibles(TaskScopeId, TraitTypeId), + /// A normal persistent task Persistent(Arc), } +enum TaskTypeForDescription { + Root, + Once, + ReadCollectibles(TraitTypeId), + Persistent(Arc), +} + +impl TaskTypeForDescription { + fn from(task_type: &TaskType) -> Self { + match task_type { + TaskType::Root(..) => Self::Root, + TaskType::Once(..) => Self::Once, + TaskType::ReadCollectibles(.., trait_id) => Self::ReadCollectibles(*trait_id), + TaskType::Persistent(ty) => Self::Persistent(ty.clone()), + } + } +} + impl Debug for TaskType { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { Self::Root(..) => f.debug_tuple("Root").finish(), Self::Once(..) => f.debug_tuple("Once").finish(), + Self::ReadCollectibles(scope_id, trait_id) => f + .debug_tuple("ReadCollectibles") + .field(scope_id) + .field(®istry::get_trait(*trait_id).name) + .finish(), Self::Persistent(ty) => Debug::fmt(ty, f), } } @@ -99,6 +129,7 @@ impl Display for TaskType { match self { Self::Root(..) => f.debug_tuple("Root").finish(), Self::Once(..) => f.debug_tuple("Once").finish(), + Self::ReadCollectibles(..) => f.debug_tuple("ReadCollectibles").finish(), Self::Persistent(ty) => Display::fmt(ty, f), } } @@ -387,6 +418,23 @@ use self::meta_state::{ FullTaskWriteGuard, TaskMetaState, TaskMetaStateReadGuard, TaskMetaStateWriteGuard, }; +/// Heuristic when a task should switch to root scoped. +/// +/// The `optimization_counter` is a number how often a scope has been added to +/// this task (and therefore to all child tasks as well). We can assume that all +/// scopes might eventually be removed again. We assume that more scopes per +/// task have higher cost, so we want to avoid that. We assume that adding and +/// removing scopes again and again is not great. But having too many root +/// scopes is also not great as it hurts strongly consistent reads and read +/// collectibles. +/// +/// The current implementation uses a heuristic that says that the cost is +/// linear to the number of added scoped and linear to the number of children. +fn should_optimize_to_root_scoped(optimization_counter: usize, children_count: usize) -> bool { + const SCOPE_OPTIMIZATION_THRESHOLD: usize = 255; + optimization_counter * children_count > SCOPE_OPTIMIZATION_THRESHOLD +} + impl Task { pub(crate) fn new_persistent( id: TaskId, @@ -443,9 +491,28 @@ impl Task { } } + pub(crate) fn new_read_collectibles( + id: TaskId, + target_scope: TaskScopeId, + trait_type_id: TraitTypeId, + stats_type: StatsType, + ) -> Self { + let ty = TaskType::ReadCollectibles(target_scope, trait_type_id); + let description = Self::get_event_description_static(id, &ty); + Self { + id, + ty, + state: RwLock::new(TaskMetaState::Full(box TaskState::new( + description, + stats_type, + ))), + } + } + pub(crate) fn is_pure(&self) -> bool { match &self.ty { TaskType::Persistent(_) => true, + TaskType::ReadCollectibles(..) => true, TaskType::Root(_) => false, TaskType::Once(_) => false, } @@ -465,37 +532,19 @@ impl Task { } pub(crate) fn get_description(&self) -> String { - match &self.ty { - TaskType::Root(..) => format!("[{}] root", self.id), - TaskType::Once(..) => format!("[{}] once", self.id), - TaskType::Persistent(ty) => match &**ty { - PersistentTaskType::Native(native_fn, _) => { - format!("[{}] {}", self.id, registry::get_function(*native_fn).name) - } - PersistentTaskType::ResolveNative(native_fn, _) => { - format!( - "[{}] [resolve] {}", - self.id, - registry::get_function(*native_fn).name - ) - } - PersistentTaskType::ResolveTrait(trait_type, fn_name, _) => { - format!( - "[{}] [resolve trait] {} in trait {}", - self.id, - fn_name, - registry::get_trait(*trait_type).name - ) - } - }, - } + Self::format_description(&TaskTypeForDescription::from(&self.ty), self.id) } - fn format_description(ty: &TaskType, id: TaskId) -> String { + fn format_description(ty: &TaskTypeForDescription, id: TaskId) -> String { match ty { - TaskType::Root(..) => format!("[{}] root", id), - TaskType::Once(..) => format!("[{}] once", id), - TaskType::Persistent(ty) => match &**ty { + TaskTypeForDescription::Root => format!("[{}] root", id), + TaskTypeForDescription::Once => format!("[{}] once", id), + TaskTypeForDescription::ReadCollectibles(trait_type_id) => format!( + "[{}] read collectibles({})", + id, + registry::get_trait(*trait_type_id).name + ), + TaskTypeForDescription::Persistent(ty) => match &**ty { PersistentTaskType::Native(native_fn, _) => { format!("[{}] {}", id, registry::get_function(*native_fn).name) } @@ -522,8 +571,8 @@ impl Task { id: TaskId, ty: &TaskType, ) -> impl Fn() -> String + Send + Sync { - let ty = Self::format_description(ty, id); - move || ty.clone() + let ty = TaskTypeForDescription::from(ty); + move || Self::format_description(&ty, id) } fn get_event_description(&self) -> impl Fn() -> String + Send + Sync { @@ -617,7 +666,7 @@ impl Task { if !self.try_start_execution(&mut state, turbo_tasks, backend) { return None; } - let future = self.make_execution_future(&mut state, turbo_tasks); + let future = self.make_execution_future(state, backend, turbo_tasks); Some(TaskExecutionSpec { future }) } @@ -672,24 +721,44 @@ impl Task { /// Prepares task execution and returns a future that will execute the task. fn make_execution_future( self: &Task, - mut state: &mut TaskState, + mut state: FullTaskWriteGuard<'_>, + backend: &MemoryBackend, turbo_tasks: &dyn TurboTasksBackendApi, ) -> Pin> + Send>> { match &self.ty { - TaskType::Root(bound_fn) => bound_fn(), - TaskType::Once(mutex) => mutex.lock().take().expect("Task can only be executed once"), + TaskType::Root(bound_fn) => { + drop(state); + bound_fn() + } + TaskType::Once(mutex) => { + drop(state); + mutex.lock().take().expect("Task can only be executed once") + } + &TaskType::ReadCollectibles(scope_id, trait_type_id) => { + drop(state); + Task::make_read_collectibles_execution_future( + self.id, + scope_id, + trait_type_id, + backend, + turbo_tasks, + ) + } TaskType::Persistent(ty) => match &**ty { PersistentTaskType::Native(native_fn, inputs) => { - if let PrepareTaskType::Native(bound_fn) = &state.prepared_type { + let future = if let PrepareTaskType::Native(bound_fn) = &state.prepared_type { bound_fn() } else { let bound_fn = registry::get_function(*native_fn).bind(inputs); let future = bound_fn(); state.prepared_type = PrepareTaskType::Native(bound_fn); future - } + }; + drop(state); + future } PersistentTaskType::ResolveNative(ref native_fn, inputs) => { + drop(state); let native_fn = *native_fn; let inputs = inputs.clone(); let turbo_tasks = turbo_tasks.pin(); @@ -700,6 +769,7 @@ impl Task { )) } PersistentTaskType::ResolveTrait(trait_type, name, inputs) => { + drop(state); let trait_type = *trait_type; let name = name.clone(); let inputs = inputs.clone(); @@ -974,11 +1044,10 @@ impl Task { pub(crate) fn add_to_scope_internal_shallow( &self, id: TaskScopeId, - is_optimization_scope: bool, - depth: usize, + merging_scopes: usize, backend: &MemoryBackend, turbo_tasks: &dyn TurboTasksBackendApi, - queue: &mut VecDeque<(TaskId, usize)>, + queue: &mut VecDeque, ) { let mut state = self.full_state_mut(); let TaskState { @@ -1019,31 +1088,27 @@ impl Task { return; } - if depth < usize::BITS as usize { - if is_optimization_scope { - *optimization_counter = - optimization_counter.saturating_sub(children.len() >> depth) - } else { - *optimization_counter += children.len() >> depth; - if *optimization_counter >= 0x10000 { - list.remove(id); - drop(self.make_root_scoped_internal(state, backend, turbo_tasks)); - return self.add_to_scope_internal_shallow( - id, - is_optimization_scope, - depth, - backend, - turbo_tasks, - queue, - ); - } + *optimization_counter += 1; + if merging_scopes > 0 { + *optimization_counter = optimization_counter.saturating_sub(merging_scopes); + } else { + if should_optimize_to_root_scoped(*optimization_counter, children.len()) { + list.remove(id); + drop(self.make_root_scoped_internal(state, backend, turbo_tasks)); + return self.add_to_scope_internal_shallow( + id, + merging_scopes, + backend, + turbo_tasks, + queue, + ); } } if queue.capacity() == 0 { queue.reserve(max(children.len(), SPLIT_OFF_QUEUE_AT * 2)); } - queue.extend(children.iter().copied().map(|child| (child, depth + 1))); + queue.extend(children.iter().copied()); // add to dirty list of the scope (potentially schedule) let schedule_self = @@ -1060,22 +1125,15 @@ impl Task { pub(crate) fn add_to_scope_internal( &self, id: TaskScopeId, - is_optimization_scope: bool, + merging_scopes: usize, backend: &MemoryBackend, turbo_tasks: &dyn TurboTasksBackendApi, ) { // VecDeque::new() would allocate with 7 items capacity. We don't want that. let mut queue = VecDeque::with_capacity(0); - self.add_to_scope_internal_shallow( - id, - is_optimization_scope, - 0, - backend, - turbo_tasks, - &mut queue, - ); + self.add_to_scope_internal_shallow(id, merging_scopes, backend, turbo_tasks, &mut queue); - run_add_to_scope_queue(queue, id, is_optimization_scope, backend, turbo_tasks); + run_add_to_scope_queue(queue, id, merging_scopes, backend, turbo_tasks); } fn add_self_to_new_scope( @@ -1332,7 +1390,7 @@ impl Task { } } - pub(crate) fn remove_root_or_initial_scope( + fn remove_root_or_initial_scope( &self, backend: &MemoryBackend, turbo_tasks: &dyn TurboTasksBackendApi, @@ -1368,6 +1426,15 @@ impl Task { } } + pub fn make_root_scoped( + &self, + backend: &MemoryBackend, + turbo_tasks: &dyn TurboTasksBackendApi, + ) { + let state = self.full_state_mut(); + self.make_root_scoped_internal(state, backend, turbo_tasks); + } + fn make_root_scoped_internal<'a>( &self, mut state: FullTaskWriteGuard<'a>, @@ -1468,9 +1535,11 @@ impl Task { let schedule_self = self.add_self_to_new_scope(&mut state, root_scope, backend, turbo_tasks); + let mut merging_scopes = Vec::with_capacity(scopes.len()); // remove self from old scopes for (scope, count) in scopes.iter() { if *count > 0 { + merging_scopes.push(*scope); self.remove_self_from_scope_full(&mut state, *scope, backend, turbo_tasks); } } @@ -1483,7 +1552,12 @@ impl Task { // Add children to new root scope for child in children.iter() { backend.with_task(*child, |child| { - child.add_to_scope_internal(root_scope, true, backend, turbo_tasks); + child.add_to_scope_internal( + root_scope, + merging_scopes.len(), + backend, + turbo_tasks, + ); }) } @@ -1495,9 +1569,20 @@ impl Task { } // Remove children from old scopes - turbo_tasks.schedule_backend_foreground_job(backend.create_backend_job( - Job::RemoveFromScopes(children, scopes.into_iter().map(|(id, _)| id).collect()), - )); + #[cfg(feature = "inline_remove_from_scope")] + for task in children { + backend.with_task(task, |task| { + task.remove_from_scopes( + merging_scopes.iter().copied(), + backend, + turbo_tasks, + ) + }); + } + #[cfg(not(feature = "inline_remove_from_scope"))] + turbo_tasks.schedule_backend_foreground_job( + backend.create_backend_job(Job::RemoveFromScopes(children, merging_scopes)), + ); None } else { Some(state) @@ -1666,6 +1751,9 @@ impl Task { match &self.ty { TaskType::Root(_) => StatsTaskType::Root(self.id), TaskType::Once(_) => StatsTaskType::Once(self.id), + TaskType::ReadCollectibles(_, trait_type_id) => { + StatsTaskType::ReadCollectibles(*trait_type_id) + } TaskType::Persistent(ty) => match &**ty { PersistentTaskType::Native(f, _) => StatsTaskType::Native(*f), PersistentTaskType::ResolveNative(f, _) => StatsTaskType::ResolveNative(*f), @@ -1752,6 +1840,13 @@ impl Task { ) { let mut state = self.full_state_mut(); if state.children.insert(child_id) { + if let TaskScopes::Inner(_, optimization_counter) = &state.scopes { + if should_optimize_to_root_scoped(*optimization_counter, state.children.len()) { + state.children.remove(&child_id); + drop(self.make_root_scoped_internal(state, backend, turbo_tasks)); + return self.connect_child(child_id, backend, turbo_tasks); + } + } let scopes = state.scopes.clone(); drop(state); @@ -1759,7 +1854,7 @@ impl Task { for scope in scopes.iter() { #[cfg(not(feature = "report_expensive"))] { - child.add_to_scope_internal(scope, false, backend, turbo_tasks); + child.add_to_scope_internal(scope, 0, backend, turbo_tasks); } #[cfg(feature = "report_expensive")] { @@ -1768,7 +1863,7 @@ impl Task { use turbo_tasks::util::FormatDuration; let start = Instant::now(); - child.add_to_scope_internal(scope, false, backend, turbo_tasks); + child.add_to_scope_internal(scope, 0, backend, turbo_tasks); let elapsed = start.elapsed(); if elapsed.as_millis() >= 10 { println!( @@ -1880,29 +1975,62 @@ impl Task { } } - pub(crate) fn try_read_task_collectibles( + fn make_read_collectibles_execution_future( + task_id: TaskId, + scope_id: TaskScopeId, + trait_type_id: TraitTypeId, + backend: &MemoryBackend, + turbo_tasks: &dyn TurboTasksBackendApi, + ) -> Pin> + Send>> { + let (mut current, children) = backend.with_scope(scope_id, |scope| { + scope.read_collectibles_and_children(scope_id, trait_type_id, task_id) + }); + log_scope_update!("reading collectibles from {scope_id}: {:?}", current); + let children = children + .into_iter() + .map(|child| { + let task = backend.get_or_create_read_collectibles_task( + child, + trait_type_id, + task_id, + false, + &*turbo_tasks, + ); + // Safety: RawVcSet is a transparent value + unsafe { + RawVc::TaskOutput(task).into_transparent_read::>() + } + }) + .collect::>(); + Box::pin(async move { + let children = children.into_iter().try_join().await?; + for child in children { + for v in child.iter() { + current.add(*v); + } + } + Ok(RawVcSetVc::cell(current.iter().copied().collect()).into()) + }) + } + + pub(crate) fn read_task_collectibles( &self, reader: TaskId, trait_id: TraitTypeId, backend: &MemoryBackend, turbo_tasks: &dyn TurboTasksBackendApi, - ) -> Result, EventListener>> { + ) -> RawVcSetVc { let mut state = self.full_state_mut(); state = self.ensure_root_scoped(state, backend, turbo_tasks); - // We need to wait for all foreground jobs to be finished as there could be - // ongoing add_to_scope jobs that need to be finished before reading - // from scopes - if let Err(listener) = turbo_tasks.try_foreground_done() { - return Ok(Err(listener)); - } if let TaskScopes::Root(scope_id) = state.scopes { - backend.with_scope(scope_id, |scope| { - if let Some(l) = scope.has_unfinished_tasks() { - return Ok(Err(l)); - } - let set = scope.read_collectibles(scope_id, trait_id, reader, backend); - Ok(Ok(set)) - }) + let task = backend.get_or_create_read_collectibles_task( + scope_id, + trait_id, + reader, + true, + turbo_tasks, + ); + RawVc::TaskOutput(task).into() } else { panic!("It's not possible to read collectibles from a non-root scope") } @@ -2472,27 +2600,31 @@ const SPLIT_OFF_QUEUE_AT: usize = 100; /// Adds a list of tasks and their children to a scope, recursively. pub fn run_add_to_scope_queue( - mut queue: VecDeque<(TaskId, usize)>, + mut queue: VecDeque, id: TaskScopeId, - is_optimization_scope: bool, + merging_scopes: usize, backend: &MemoryBackend, turbo_tasks: &dyn TurboTasksBackendApi, ) { - while let Some((child, depth)) = queue.pop_front() { + while let Some(child) = queue.pop_front() { backend.with_task(child, |child| { child.add_to_scope_internal_shallow( id, - is_optimization_scope, - depth, + merging_scopes, backend, turbo_tasks, &mut queue, ); }); + #[cfg(not(feature = "inline_add_to_scope"))] while queue.len() > SPLIT_OFF_QUEUE_AT { let split_off_queue = queue.split_off(queue.len() - SPLIT_OFF_QUEUE_AT); turbo_tasks.schedule_backend_foreground_job(backend.create_backend_job( - Job::AddToScopeQueue(split_off_queue, id, is_optimization_scope), + Job::AddToScopeQueue { + queue: split_off_queue, + scope: id, + merging_scopes, + }, )); } } @@ -2509,6 +2641,7 @@ pub fn run_remove_from_scope_queue( backend.with_task(child, |child| { child.remove_from_scope_internal_shallow(id, backend, turbo_tasks, &mut queue); }); + #[cfg(not(feature = "inline_remove_from_scope"))] while queue.len() > SPLIT_OFF_QUEUE_AT { let split_off_queue = queue.split_off(queue.len() - SPLIT_OFF_QUEUE_AT); diff --git a/crates/turbo-tasks-testing/src/lib.rs b/crates/turbo-tasks-testing/src/lib.rs index 526bca03adc3a..0bcd8788d78ae 100644 --- a/crates/turbo-tasks-testing/src/lib.rs +++ b/crates/turbo-tasks-testing/src/lib.rs @@ -18,6 +18,7 @@ use auto_hash_map::AutoSet; use turbo_tasks::{ backend::CellContent, event::{Event, EventListener}, + primitives::RawVcSetVc, registry, test_helpers::{current_task_for_testing, with_turbo_tasks_for_testing}, CellId, RawVc, TaskId, TraitTypeId, TurboTasksApi, TurboTasksCallApi, @@ -179,11 +180,7 @@ impl TurboTasksApi for VcStorage { unimplemented!() } - fn try_read_task_collectibles( - &self, - _task: TaskId, - _trait_id: TraitTypeId, - ) -> Result, EventListener>> { + fn read_task_collectibles(&self, _task: TaskId, _trait_id: TraitTypeId) -> RawVcSetVc { unimplemented!() } diff --git a/crates/turbo-tasks/src/backend.rs b/crates/turbo-tasks/src/backend.rs index 2f4ab3b4dd056..ecd039b858626 100644 --- a/crates/turbo-tasks/src/backend.rs +++ b/crates/turbo-tasks/src/backend.rs @@ -10,14 +10,13 @@ use std::{ }; use anyhow::{anyhow, Result}; -use auto_hash_map::AutoSet; use serde::{Deserialize, Serialize}; pub use crate::id::BackendJobId; use crate::{ - event::EventListener, manager::TurboTasksBackendApi, raw_vc::CellId, registry, - task_input::SharedReference, FunctionId, RawVc, ReadRef, TaskId, TaskIdProvider, TaskInput, - TraitTypeId, + event::EventListener, manager::TurboTasksBackendApi, primitives::RawVcSetVc, raw_vc::CellId, + registry, task_input::SharedReference, FunctionId, RawVc, ReadRef, TaskId, TaskIdProvider, + TaskInput, TraitTypeId, }; /// Different Task types @@ -276,13 +275,13 @@ pub trait Backend: Sync + Send { } } - fn try_read_task_collectibles( + fn read_task_collectibles( &self, task: TaskId, trait_id: TraitTypeId, reader: TaskId, turbo_tasks: &dyn TurboTasksBackendApi, - ) -> Result, EventListener>>; + ) -> RawVcSetVc; fn emit_collectible( &self, diff --git a/crates/turbo-tasks/src/collectibles.rs b/crates/turbo-tasks/src/collectibles.rs index b8fcc1f259c03..841ab151061a2 100644 --- a/crates/turbo-tasks/src/collectibles.rs +++ b/crates/turbo-tasks/src/collectibles.rs @@ -1,24 +1,4 @@ -use anyhow::{anyhow, Result}; - -use crate::{ - self as turbo_tasks, manager::read_task_collectibles, primitives::RawVcSetVc, - CollectiblesFuture, RawVc, TraitTypeId, -}; - -#[turbo_tasks::function] -#[allow(dead_code)] // It's used indirectly -pub async fn read_collectibles(raw: RawVc, trait_type: usize) -> Result { - if let RawVc::TaskOutput(task) = raw { - let tt = crate::turbo_tasks(); - let set = read_task_collectibles(&*tt, task, TraitTypeId::from(trait_type)).await?; - Ok(RawVcSetVc::cell(set)) - } else { - Err(anyhow!( - "peek/take_collectibles was called on Vc that points to a cell (instead of a Vc that \ - points to a task output)" - )) - } -} +use crate::{self as turbo_tasks, CollectiblesFuture}; pub trait CollectiblesSource { fn take_collectibles(self) -> CollectiblesFuture; diff --git a/crates/turbo-tasks/src/lib.rs b/crates/turbo-tasks/src/lib.rs index 571cbd140efa1..2c55fdb3b096a 100644 --- a/crates/turbo-tasks/src/lib.rs +++ b/crates/turbo-tasks/src/lib.rs @@ -75,7 +75,7 @@ pub use join_iter_ext::{JoinIterExt, TryJoinIterExt}; pub use manager::{ dynamic_call, emit, get_invalidator, mark_stateful, run_once, spawn_blocking, spawn_thread, trait_call, turbo_tasks, Invalidator, StatsType, TaskIdProvider, TurboTasks, TurboTasksApi, - TurboTasksBackendApi, TurboTasksCallApi, + TurboTasksBackendApi, TurboTasksCallApi, Unused, }; pub use native_function::{NativeFunction, NativeFunctionVc}; pub use nothing::{Nothing, NothingVc}; diff --git a/crates/turbo-tasks/src/manager.rs b/crates/turbo-tasks/src/manager.rs index 0ffcbe3f0e3f3..3cb09870bf0a7 100644 --- a/crates/turbo-tasks/src/manager.rs +++ b/crates/turbo-tasks/src/manager.rs @@ -27,6 +27,7 @@ use crate::{ event::{Event, EventListener}, id::{BackendJobId, FunctionId, TraitTypeId}, id_factory::IdFactory, + primitives::RawVcSetVc, raw_vc::{CellId, RawVc}, registry, task_input::{SharedReference, TaskInput}, @@ -91,11 +92,7 @@ pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send { index: CellId, ) -> Result>; - fn try_read_task_collectibles( - &self, - task: TaskId, - trait_id: TraitTypeId, - ) -> Result, EventListener>>; + fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> RawVcSetVc; fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc); fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc); @@ -126,20 +123,48 @@ pub enum StatsType { } pub trait TaskIdProvider { - fn get_fresh_task_id(&self) -> TaskId; - /// # Safety - /// - /// It must be ensured that the id is no longer used - unsafe fn reuse_task_id(&self, id: TaskId); + fn get_fresh_task_id(&self) -> Unused; + fn reuse_task_id(&self, id: Unused); } impl TaskIdProvider for IdFactory { - fn get_fresh_task_id(&self) -> TaskId { - self.get() + fn get_fresh_task_id(&self) -> Unused { + // Safety: This is a fresh id from the factory + unsafe { Unused::new_unchecked(self.get()) } } - unsafe fn reuse_task_id(&self, id: TaskId) { - unsafe { self.reuse(id) } + fn reuse_task_id(&self, id: Unused) { + unsafe { self.reuse(id.into()) } + } +} + +/// A wrapper around a value that is unused. +pub struct Unused { + inner: T, +} + +impl Unused { + /// Creates a new unused value. + /// + /// # Safety + /// + /// The wrapped value must not be used. + pub unsafe fn new_unchecked(inner: T) -> Self { + Self { inner } + } + + /// Get the inner value, without consuming the `Unused` wrapper. + /// + /// # Safety + /// + /// The user need to make sure that the value stays unused. + pub unsafe fn get_unchecked(&self) -> &T { + &self.inner + } + + /// Unwraps the value, consuming the `Unused` wrapper. + pub fn into(self) -> T { + self.inner } } @@ -184,22 +209,22 @@ impl StatsType { } impl TaskIdProvider for &dyn TurboTasksBackendApi { - fn get_fresh_task_id(&self) -> TaskId { + fn get_fresh_task_id(&self) -> Unused { (*self).get_fresh_task_id() } - unsafe fn reuse_task_id(&self, id: TaskId) { - unsafe { (*self).reuse_task_id(id) } + fn reuse_task_id(&self, id: Unused) { + (*self).reuse_task_id(id) } } impl TaskIdProvider for &dyn TaskIdProvider { - fn get_fresh_task_id(&self) -> TaskId { + fn get_fresh_task_id(&self) -> Unused { (*self).get_fresh_task_id() } - unsafe fn reuse_task_id(&self, id: TaskId) { - unsafe { (*self).reuse_task_id(id) } + fn reuse_task_id(&self, id: Unused) { + (*self).reuse_task_id(id) } } @@ -400,18 +425,16 @@ impl TurboTasks { if this.stopped.load(Ordering::Acquire) { return false; } - if let Some(execution) = this.backend.try_start_task_execution(task_id, &*this) - { - // Setup thread locals - let (result, duration, instant) = CELL_COUNTERS - .scope(Default::default(), async { - let (result, duration, instant) = TimedFuture::new( - AssertUnwindSafe(execution.future).catch_unwind(), - ) - .await; - (result, duration, instant) - }) - .await; + + // Setup thread locals + let execution_future = CELL_COUNTERS.scope(Default::default(), async { + let execution = this.backend.try_start_task_execution(task_id, &*this)?; + Some( + TimedFuture::new(AssertUnwindSafe(execution.future).catch_unwind()) + .await, + ) + }); + if let Some((result, duration, instant)) = execution_future.await { if cfg!(feature = "log_function_stats") && duration.as_millis() > 1000 { println!( "{} took {}", @@ -811,12 +834,8 @@ impl TurboTasksApi for TurboTasks { .try_read_own_task_cell_untracked(current_task, index, self) } - fn try_read_task_collectibles( - &self, - task: TaskId, - trait_id: TraitTypeId, - ) -> Result, EventListener>> { - self.backend.try_read_task_collectibles( + fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> RawVcSetVc { + self.backend.read_task_collectibles( task, trait_id, current_task("reading collectibles"), @@ -975,12 +994,13 @@ impl TurboTasksBackendApi for TurboTasks { } impl TaskIdProvider for TurboTasks { - fn get_fresh_task_id(&self) -> TaskId { - self.task_id_factory.get() + fn get_fresh_task_id(&self) -> Unused { + // Safety: This is a fresh id from the factory + unsafe { Unused::new_unchecked(self.task_id_factory.get()) } } - unsafe fn reuse_task_id(&self, id: TaskId) { - unsafe { self.task_id_factory.reuse(id) } + fn reuse_task_id(&self, id: Unused) { + unsafe { self.task_id_factory.reuse(id.into()) } } } @@ -1235,19 +1255,6 @@ pub(crate) async fn read_task_cell_untracked( } } -pub(crate) async fn read_task_collectibles( - this: &dyn TurboTasksApi, - id: TaskId, - trait_id: TraitTypeId, -) -> Result> { - loop { - match this.try_read_task_collectibles(id, trait_id)? { - Ok(result) => return Ok(result), - Err(listener) => listener.await, - } - } -} - pub struct CurrentCellRef { current_task: TaskId, index: CellId, diff --git a/crates/turbo-tasks/src/raw_vc.rs b/crates/turbo-tasks/src/raw_vc.rs index 192258cb9ef8d..509955726d7cb 100644 --- a/crates/turbo-tasks/src/raw_vc.rs +++ b/crates/turbo-tasks/src/raw_vc.rs @@ -1,7 +1,7 @@ use std::{ any::Any, fmt::{Debug, Display}, - future::{Future, IntoFuture}, + future::Future, hash::Hash, marker::PhantomData, pin::Pin, @@ -335,15 +335,10 @@ impl CollectiblesSource for RawVc { fn peek_collectibles(self) -> CollectiblesFuture { let tt = turbo_tasks(); tt.notify_scheduled_tasks(); - let set: RawVcSetVc = tt - .native_call( - *crate::collectibles::READ_COLLECTIBLES_FUNCTION_ID, - vec![self.into(), (*T::get_trait_type_id()).into()], - ) - .into(); + let set: RawVcSetVc = tt.read_task_collectibles(self.get_task_id(), T::get_trait_type_id()); CollectiblesFuture { turbo_tasks: tt, - inner: set.into_future(), + inner: set.strongly_consistent(), take: false, phantom: PhantomData, } @@ -352,15 +347,10 @@ impl CollectiblesSource for RawVc { fn take_collectibles(self) -> CollectiblesFuture { let tt = turbo_tasks(); tt.notify_scheduled_tasks(); - let set: RawVcSetVc = tt - .native_call( - *crate::collectibles::READ_COLLECTIBLES_FUNCTION_ID, - vec![self.into(), (*T::get_trait_type_id()).into()], - ) - .into(); + let set: RawVcSetVc = tt.read_task_collectibles(self.get_task_id(), T::get_trait_type_id()); CollectiblesFuture { turbo_tasks: tt, - inner: set.into_future(), + inner: set.strongly_consistent(), take: true, phantom: PhantomData, } diff --git a/crates/turbopack-dev-server/src/source/combined.rs b/crates/turbopack-dev-server/src/source/combined.rs index 300fc76f40af7..801e02cb5ed77 100644 --- a/crates/turbopack-dev-server/src/source/combined.rs +++ b/crates/turbopack-dev-server/src/source/combined.rs @@ -102,8 +102,12 @@ impl PausableCombinedContentSource { // we've skipped to exactly the source which requested data. Requery the source // with it's partially computed path and needed data. let result = match pending.take() { - Some(pending) => pending.source.get(&pending.path, mem::take(&mut data)), - None => source.get(path, Default::default()), + Some(pending) => pending + .source + .resolve() + .await? + .get(&pending.path, mem::take(&mut data)), + None => source.resolve().await?.get(path, Default::default()), }; let res = result.await?; diff --git a/crates/turbopack-dev-server/src/source/resolve.rs b/crates/turbopack-dev-server/src/source/resolve.rs index 96e827842382a..9dc0e241350f2 100644 --- a/crates/turbopack-dev-server/src/source/resolve.rs +++ b/crates/turbopack-dev-server/src/source/resolve.rs @@ -77,7 +77,7 @@ pub async fn resolve_source_request( let new_asset_path = urlencoding::decode(&new_uri.path()[1..])?.into_owned(); - current_source = new_source; + current_source = new_source.resolve().await?; request_overwrites.uri = new_uri; current_asset_path = new_asset_path; data = ContentSourceData::default(); diff --git a/crates/turbopack-dev-server/src/source/router.rs b/crates/turbopack-dev-server/src/source/router.rs index 3b9e8a49ad1a3..c3f7e5f0bbe8e 100644 --- a/crates/turbopack-dev-server/src/source/router.rs +++ b/crates/turbopack-dev-server/src/source/router.rs @@ -28,9 +28,13 @@ impl RouterContentSource { #[turbo_tasks::value_impl] impl ContentSource for RouterContentSource { #[turbo_tasks::function] - fn get(&self, path: &str, data: Value) -> ContentSourceResultVc { + async fn get( + &self, + path: &str, + data: Value, + ) -> Result { let (source, path) = self.get_source(path); - source.get(path, data) + Ok(source.resolve().await?.get(path, data)) } #[turbo_tasks::function]