Skip to content

Commit

Permalink
improve performance of TaskScopes and collectibes (vercel/turborepo#3849
Browse files Browse the repository at this point in the history
)
  • Loading branch information
sokra authored Feb 17, 2023
1 parent 9c1faed commit 91e44b5
Show file tree
Hide file tree
Showing 16 changed files with 509 additions and 298 deletions.
2 changes: 2 additions & 0 deletions crates/turbo-tasks-memory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ log_connect_tasks = []
report_expensive = []
print_scope_updates = []
print_task_invalidation = []
inline_add_to_scope = []
inline_remove_from_scope = []

[[bench]]
name = "mod"
Expand Down
5 changes: 5 additions & 0 deletions crates/turbo-tasks-memory/src/count_hash_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ impl<T: Eq + Hash, H: BuildHasher + Default> CountHashSet<T, H> {
self.add_count(item, 1)
}

/// Returns the current count of an item
pub fn get(&self, item: &T) -> isize {
*self.inner.get(item).unwrap_or(&0)
}

/// Returns true when the value is no longer visible from outside
pub fn remove_count(&mut self, item: T, count: usize) -> bool {
match self.inner.entry(item) {
Expand Down
193 changes: 148 additions & 45 deletions crates/turbo-tasks-memory/src/memory_backend.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::{
borrow::Cow,
borrow::{Borrow, Cow},
cell::RefCell,
cmp::min,
collections::VecDeque,
future::Future,
hash::BuildHasherDefault,
hash::{BuildHasher, BuildHasherDefault, Hash},
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Expand All @@ -24,8 +24,9 @@ use turbo_tasks::{
TransientTaskType,
},
event::EventListener,
primitives::RawVcSetVc,
util::{IdFactory, NoMoveVec},
CellId, RawVc, TaskId, TraitTypeId, TurboTasksBackendApi,
CellId, RawVc, TaskId, TraitTypeId, TurboTasksBackendApi, Unused,
};

use crate::{
Expand All @@ -48,6 +49,8 @@ pub struct MemoryBackend {
backend_jobs: NoMoveVec<Job>,
backend_job_id_factory: IdFactory<BackendJobId>,
task_cache: DashMap<Arc<PersistentTaskType>, TaskId, BuildHasherDefault<FxHasher>>,
read_collectibles_task_cache:
DashMap<(TaskScopeId, TraitTypeId), TaskId, BuildHasherDefault<FxHasher>>,
memory_limit: usize,
gc_queue: Option<GcQueue>,
idle_gc_active: AtomicBool,
Expand Down Expand Up @@ -76,6 +79,7 @@ impl MemoryBackend {
backend_jobs: NoMoveVec::new(),
backend_job_id_factory: IdFactory::new(),
task_cache: DashMap::default(),
read_collectibles_task_cache: DashMap::default(),
memory_limit,
gc_queue: (memory_limit != usize::MAX).then(GcQueue::new),
idle_gc_active: AtomicBool::new(false),
Expand Down Expand Up @@ -121,12 +125,22 @@ impl MemoryBackend {
for id in self.task_cache.clone().into_read_only().values() {
func(*id);
}
for id in self
.read_collectibles_task_cache
.clone()
.into_read_only()
.values()
{
func(*id);
}
}

#[inline(always)]
pub fn with_task<T>(&self, id: TaskId, func: impl FnOnce(&Task) -> T) -> T {
func(self.memory_tasks.get(*id).unwrap())
}

#[inline(always)]
pub fn with_scope<T>(&self, id: TaskScopeId, func: impl FnOnce(&TaskScope) -> T) -> T {
func(self.memory_task_scopes.get(*id).unwrap())
}
Expand Down Expand Up @@ -261,6 +275,99 @@ impl MemoryBackend {
}
}
}

pub(crate) fn get_or_create_read_collectibles_task(
&self,
scope_id: TaskScopeId,
trait_type: TraitTypeId,
parent_task: TaskId,
root_scoped: bool,
turbo_tasks: &dyn TurboTasksBackendApi,
) -> TaskId {
if let Some(task) = self.lookup_and_connect_task(
parent_task,
&self.read_collectibles_task_cache,
&(scope_id, trait_type),
turbo_tasks,
) {
// fast pass without creating a new task
task
} else {
// slow pass with key lock
let id = turbo_tasks.get_fresh_task_id();
let task = Task::new_read_collectibles(
// Safety: That task will hold the value, but we are still in
// control of the task
*unsafe { id.get_unchecked() },
scope_id,
trait_type,
turbo_tasks.stats_type(),
);
self.insert_and_connect_fresh_task(
parent_task,
&self.read_collectibles_task_cache,
(scope_id, trait_type),
id,
task,
root_scoped,
turbo_tasks,
)
}
}

fn insert_and_connect_fresh_task<K: Eq + Hash, H: BuildHasher + Clone>(
&self,
parent_task: TaskId,
task_cache: &DashMap<K, TaskId, H>,
key: K,
new_id: Unused<TaskId>,
task: Task,
root_scoped: bool,
turbo_tasks: &dyn TurboTasksBackendApi,
) -> TaskId {
let new_id = new_id.into();
// Safety: We have a fresh task id that nobody knows about yet
let task = unsafe { self.memory_tasks.insert(*new_id, task) };
if root_scoped {
task.make_root_scoped(self, turbo_tasks);
}
let result_task = match task_cache.entry(key) {
Entry::Vacant(entry) => {
// This is the most likely case
entry.insert(new_id);
new_id
}
Entry::Occupied(entry) => {
// Safety: We have a fresh task id that nobody knows about yet
unsafe {
self.memory_tasks.remove(*new_id);
let new_id = Unused::new_unchecked(new_id);
turbo_tasks.reuse_task_id(new_id);
}
*entry.get()
}
};
self.connect_task_child(parent_task, result_task, turbo_tasks);
result_task
}

fn lookup_and_connect_task<K: Hash + Eq, Q, H: BuildHasher + Clone>(
&self,
parent_task: TaskId,
task_cache: &DashMap<K, TaskId, H>,
key: &Q,
turbo_tasks: &dyn TurboTasksBackendApi,
) -> Option<TaskId>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
task_cache.get(key).map(|task| {
self.connect_task_child(parent_task, *task, turbo_tasks);

*task
})
}
}

impl Backend for MemoryBackend {
Expand Down Expand Up @@ -445,15 +552,15 @@ impl Backend for MemoryBackend {
})
}

fn try_read_task_collectibles(
fn read_task_collectibles(
&self,
id: TaskId,
trait_id: TraitTypeId,
reader: TaskId,
turbo_tasks: &dyn TurboTasksBackendApi,
) -> Result<Result<AutoSet<RawVc>, EventListener>> {
) -> RawVcSetVc {
self.with_task(id, |task| {
task.try_read_task_collectibles(reader, trait_id, self, turbo_tasks)
task.read_task_collectibles(reader, trait_id, self, turbo_tasks)
})
}

Expand Down Expand Up @@ -519,12 +626,10 @@ impl Backend for MemoryBackend {
parent_task: TaskId,
turbo_tasks: &dyn TurboTasksBackendApi,
) -> TaskId {
let result = if let Some(task) = self.task_cache.get(&task_type).map(|task| *task) {
if let Some(task) =
self.lookup_and_connect_task(parent_task, &self.task_cache, &task_type, turbo_tasks)
{
// fast pass without creating a new task
self.connect_task_child(parent_task, task, turbo_tasks);

// TODO maybe force (background) scheduling to avoid inactive tasks hanging in
// "in progress" until they become active
task
} else {
// It's important to avoid overallocating memory as this will go into the task
Expand All @@ -533,30 +638,23 @@ impl Backend for MemoryBackend {
let task_type = Arc::new(task_type);
// slow pass with key lock
let id = turbo_tasks.get_fresh_task_id();
let task = Task::new_persistent(id, task_type.clone(), turbo_tasks.stats_type());
// Safety: We have a fresh task id that nobody knows about yet
unsafe {
self.memory_tasks.insert(*id, task);
}
let result_task = match self.task_cache.entry(task_type) {
Entry::Vacant(entry) => {
// This is the most likely case
entry.insert(id);
id
}
Entry::Occupied(entry) => {
// Safety: We have a fresh task id that nobody knows about yet
unsafe {
self.memory_tasks.remove(*id);
turbo_tasks.reuse_task_id(id);
}
*entry.get()
}
};
self.connect_task_child(parent_task, result_task, turbo_tasks);
result_task
};
result
let task = Task::new_persistent(
// Safety: That task will hold the value, but we are still in
// control of the task
*unsafe { id.get_unchecked() },
task_type.clone(),
turbo_tasks.stats_type(),
);
self.insert_and_connect_fresh_task(
parent_task,
&self.task_cache,
task_type,
id,
task,
false,
turbo_tasks,
)
}
}

fn create_transient_task(
Expand All @@ -572,6 +670,7 @@ impl Backend for MemoryBackend {
scope.increment_unfinished_tasks(self);
});
let stats_type = turbo_tasks.stats_type();
let id = id.into();
let task = match task_type {
TransientTaskType::Root(f) => Task::new_root(id, scope, move || f() as _, stats_type),
TransientTaskType::Once(f) => Task::new_once(id, scope, f, stats_type),
Expand All @@ -591,7 +690,13 @@ pub(crate) enum Job {
ScheduleWhenDirtyFromScope(AutoSet<TaskId>),
/// Add tasks from a scope. Scheduled by `run_add_from_scope_queue` to
/// split off work.
AddToScopeQueue(VecDeque<(TaskId, usize)>, TaskScopeId, bool),
AddToScopeQueue {
queue: VecDeque<TaskId>,
scope: TaskScopeId,
/// Number of scopes that are currently being merged into this scope.
/// This information is only used for optimization.
merging_scopes: usize,
},
/// Remove tasks from a scope. Scheduled by `run_remove_from_scope_queue` to
/// split off work.
RemoveFromScopeQueue(VecDeque<TaskId>, TaskScopeId),
Expand All @@ -618,7 +723,7 @@ impl Job {
Job::RemoveFromScopes(tasks, scopes) => {
for task in tasks {
backend.with_task(task, |task| {
task.remove_from_scopes(scopes.iter().cloned(), backend, turbo_tasks)
task.remove_from_scopes(scopes.iter().copied(), backend, turbo_tasks)
});
}
backend.scope_add_remove_priority.finish_high();
Expand All @@ -638,17 +743,15 @@ impl Job {
})
}
}
Job::AddToScopeQueue(queue, id, is_optimization_scope) => {
Job::AddToScopeQueue {
queue,
scope,
merging_scopes,
} => {
backend
.scope_add_remove_priority
.run_low(async {
run_add_to_scope_queue(
queue,
id,
is_optimization_scope,
backend,
turbo_tasks,
);
run_add_to_scope_queue(queue, scope, merging_scopes, backend, turbo_tasks);
})
.await;
}
Expand Down
13 changes: 10 additions & 3 deletions crates/turbo-tasks-memory/src/memory_backend_with_pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ use turbo_tasks::{
ActivateResult, DeactivateResult, PersistResult, PersistTaskState, PersistedGraph,
PersistedGraphApi, ReadTaskState, TaskCell, TaskData,
},
primitives::RawVcSetVc,
util::{IdFactory, NoMoveVec, SharedError},
CellId, RawVc, TaskId, TraitTypeId, TurboTasksBackendApi,
CellId, RawVc, TaskId, TraitTypeId, TurboTasksBackendApi, Unused,
};

type RootTaskFn =
Expand Down Expand Up @@ -1395,13 +1396,13 @@ impl<P: PersistedGraph> Backend for MemoryBackendWithPersistedGraph<P> {
}
}

fn try_read_task_collectibles(
fn read_task_collectibles(
&self,
_task: TaskId,
_trait_id: TraitTypeId,
_reader: TaskId,
_turbo_tasks: &dyn TurboTasksBackendApi,
) -> Result<Result<AutoSet<RawVc>, EventListener>> {
) -> RawVcSetVc {
todo!()
}

Expand Down Expand Up @@ -1484,6 +1485,7 @@ impl<P: PersistedGraph> Backend for MemoryBackendWithPersistedGraph<P> {
}

let task = turbo_tasks.get_fresh_task_id();
let task = task.into();
let new_task = Task {
active_parents: AtomicU32::new(1),
task_state: Mutex::new(TaskState {
Expand All @@ -1505,6 +1507,7 @@ impl<P: PersistedGraph> Backend for MemoryBackendWithPersistedGraph<P> {
// SAFETY: We are still the only owner of this task and id
unsafe {
self.tasks.remove(*task);
let task = Unused::new_unchecked(task);
turbo_tasks.reuse_task_id(task);
}
self.connect(parent_task, existing_task, turbo_tasks);
Expand All @@ -1528,6 +1531,7 @@ impl<P: PersistedGraph> Backend for MemoryBackendWithPersistedGraph<P> {
turbo_tasks: &dyn TurboTasksBackendApi,
) -> TaskId {
let task = turbo_tasks.get_fresh_task_id();
let task = task.into();
let new_task = Task {
active_parents: AtomicU32::new(1),
task_state: Mutex::new(TaskState {
Expand Down Expand Up @@ -1567,14 +1571,17 @@ impl<'a, P: PersistedGraph> PersistedGraphApi for MemoryBackendPersistedGraphApi
task_type: TaskType::Persistent(task_type.clone()),
};
let task = self.turbo_tasks.get_fresh_task_id();
let task = task.into();
// SAFETY: It's a fresh task id
unsafe {
self.backend.tasks.insert(*task, new_task);
}
match cache.entry(task_type) {
Entry::Occupied(e) => {
let value = *e.into_ref();
// Safety: We didn't store the task id in the cache, we it's still unused
unsafe {
let task = Unused::new_unchecked(task);
self.turbo_tasks.reuse_task_id(task);
}
value
Expand Down
Loading

0 comments on commit 91e44b5

Please sign in to comment.