Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve performance of TaskScopes and collectibes #3849

Merged
merged 18 commits into from
Feb 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/turbo-tasks-memory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ log_connect_tasks = []
report_expensive = []
print_scope_updates = []
print_task_invalidation = []
inline_add_to_scope = []
inline_remove_from_scope = []

[[bench]]
name = "mod"
Expand Down
5 changes: 5 additions & 0 deletions crates/turbo-tasks-memory/src/count_hash_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ impl<T: Eq + Hash, H: BuildHasher + Default> CountHashSet<T, H> {
self.add_count(item, 1)
}

/// Returns the current count of an item
pub fn get(&self, item: &T) -> isize {
*self.inner.get(item).unwrap_or(&0)
}

/// Returns true when the value is no longer visible from outside
pub fn remove_count(&mut self, item: T, count: usize) -> bool {
match self.inner.entry(item) {
Expand Down
193 changes: 148 additions & 45 deletions crates/turbo-tasks-memory/src/memory_backend.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::{
borrow::Cow,
borrow::{Borrow, Cow},
cell::RefCell,
cmp::min,
collections::VecDeque,
future::Future,
hash::BuildHasherDefault,
hash::{BuildHasher, BuildHasherDefault, Hash},
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Expand All @@ -24,8 +24,9 @@ use turbo_tasks::{
TransientTaskType,
},
event::EventListener,
primitives::RawVcSetVc,
util::{IdFactory, NoMoveVec},
CellId, RawVc, TaskId, TraitTypeId, TurboTasksBackendApi,
CellId, RawVc, TaskId, TraitTypeId, TurboTasksBackendApi, Unused,
};

use crate::{
Expand All @@ -48,6 +49,8 @@ pub struct MemoryBackend {
backend_jobs: NoMoveVec<Job>,
backend_job_id_factory: IdFactory<BackendJobId>,
task_cache: DashMap<Arc<PersistentTaskType>, TaskId, BuildHasherDefault<FxHasher>>,
read_collectibles_task_cache:
DashMap<(TaskScopeId, TraitTypeId), TaskId, BuildHasherDefault<FxHasher>>,
memory_limit: usize,
gc_queue: Option<GcQueue>,
idle_gc_active: AtomicBool,
Expand Down Expand Up @@ -76,6 +79,7 @@ impl MemoryBackend {
backend_jobs: NoMoveVec::new(),
backend_job_id_factory: IdFactory::new(),
task_cache: DashMap::default(),
read_collectibles_task_cache: DashMap::default(),
memory_limit,
gc_queue: (memory_limit != usize::MAX).then(GcQueue::new),
idle_gc_active: AtomicBool::new(false),
Expand Down Expand Up @@ -121,12 +125,22 @@ impl MemoryBackend {
for id in self.task_cache.clone().into_read_only().values() {
func(*id);
}
for id in self
.read_collectibles_task_cache
.clone()
.into_read_only()
.values()
{
func(*id);
}
}

#[inline(always)]
pub fn with_task<T>(&self, id: TaskId, func: impl FnOnce(&Task) -> T) -> T {
func(self.memory_tasks.get(*id).unwrap())
}

#[inline(always)]
pub fn with_scope<T>(&self, id: TaskScopeId, func: impl FnOnce(&TaskScope) -> T) -> T {
func(self.memory_task_scopes.get(*id).unwrap())
}
Expand Down Expand Up @@ -261,6 +275,99 @@ impl MemoryBackend {
}
}
}

pub(crate) fn get_or_create_read_collectibles_task(
&self,
scope_id: TaskScopeId,
trait_type: TraitTypeId,
parent_task: TaskId,
root_scoped: bool,
turbo_tasks: &dyn TurboTasksBackendApi,
) -> TaskId {
if let Some(task) = self.lookup_and_connect_task(
parent_task,
&self.read_collectibles_task_cache,
&(scope_id, trait_type),
turbo_tasks,
) {
// fast pass without creating a new task
task
} else {
// slow pass with key lock
let id = turbo_tasks.get_fresh_task_id();
let task = Task::new_read_collectibles(
// Safety: That task will hold the value, but we are still in
// control of the task
*unsafe { id.get_unchecked() },
scope_id,
trait_type,
turbo_tasks.stats_type(),
);
self.insert_and_connect_fresh_task(
parent_task,
&self.read_collectibles_task_cache,
(scope_id, trait_type),
id,
task,
root_scoped,
turbo_tasks,
)
}
}

fn insert_and_connect_fresh_task<K: Eq + Hash, H: BuildHasher + Clone>(
&self,
parent_task: TaskId,
task_cache: &DashMap<K, TaskId, H>,
key: K,
new_id: Unused<TaskId>,
task: Task,
root_scoped: bool,
turbo_tasks: &dyn TurboTasksBackendApi,
) -> TaskId {
let new_id = new_id.into();
// Safety: We have a fresh task id that nobody knows about yet
let task = unsafe { self.memory_tasks.insert(*new_id, task) };
if root_scoped {
task.make_root_scoped(self, turbo_tasks);
}
let result_task = match task_cache.entry(key) {
Entry::Vacant(entry) => {
// This is the most likely case
entry.insert(new_id);
new_id
}
Entry::Occupied(entry) => {
// Safety: We have a fresh task id that nobody knows about yet
unsafe {
self.memory_tasks.remove(*new_id);
let new_id = Unused::new_unchecked(new_id);
turbo_tasks.reuse_task_id(new_id);
}
*entry.get()
}
};
self.connect_task_child(parent_task, result_task, turbo_tasks);
result_task
}

fn lookup_and_connect_task<K: Hash + Eq, Q, H: BuildHasher + Clone>(
&self,
parent_task: TaskId,
task_cache: &DashMap<K, TaskId, H>,
key: &Q,
turbo_tasks: &dyn TurboTasksBackendApi,
) -> Option<TaskId>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
task_cache.get(key).map(|task| {
self.connect_task_child(parent_task, *task, turbo_tasks);

*task
})
}
}

impl Backend for MemoryBackend {
Expand Down Expand Up @@ -445,15 +552,15 @@ impl Backend for MemoryBackend {
})
}

fn try_read_task_collectibles(
fn read_task_collectibles(
&self,
id: TaskId,
trait_id: TraitTypeId,
reader: TaskId,
turbo_tasks: &dyn TurboTasksBackendApi,
) -> Result<Result<AutoSet<RawVc>, EventListener>> {
) -> RawVcSetVc {
self.with_task(id, |task| {
task.try_read_task_collectibles(reader, trait_id, self, turbo_tasks)
task.read_task_collectibles(reader, trait_id, self, turbo_tasks)
})
}

Expand Down Expand Up @@ -519,12 +626,10 @@ impl Backend for MemoryBackend {
parent_task: TaskId,
turbo_tasks: &dyn TurboTasksBackendApi,
) -> TaskId {
let result = if let Some(task) = self.task_cache.get(&task_type).map(|task| *task) {
if let Some(task) =
self.lookup_and_connect_task(parent_task, &self.task_cache, &task_type, turbo_tasks)
{
// fast pass without creating a new task
self.connect_task_child(parent_task, task, turbo_tasks);

// TODO maybe force (background) scheduling to avoid inactive tasks hanging in
// "in progress" until they become active
task
} else {
// It's important to avoid overallocating memory as this will go into the task
Expand All @@ -533,30 +638,23 @@ impl Backend for MemoryBackend {
let task_type = Arc::new(task_type);
// slow pass with key lock
let id = turbo_tasks.get_fresh_task_id();
let task = Task::new_persistent(id, task_type.clone(), turbo_tasks.stats_type());
// Safety: We have a fresh task id that nobody knows about yet
unsafe {
self.memory_tasks.insert(*id, task);
}
let result_task = match self.task_cache.entry(task_type) {
Entry::Vacant(entry) => {
// This is the most likely case
entry.insert(id);
id
}
Entry::Occupied(entry) => {
// Safety: We have a fresh task id that nobody knows about yet
unsafe {
self.memory_tasks.remove(*id);
turbo_tasks.reuse_task_id(id);
}
*entry.get()
}
};
self.connect_task_child(parent_task, result_task, turbo_tasks);
result_task
};
result
let task = Task::new_persistent(
// Safety: That task will hold the value, but we are still in
// control of the task
*unsafe { id.get_unchecked() },
task_type.clone(),
turbo_tasks.stats_type(),
);
self.insert_and_connect_fresh_task(
parent_task,
&self.task_cache,
task_type,
id,
task,
false,
turbo_tasks,
)
}
}

fn create_transient_task(
Expand All @@ -572,6 +670,7 @@ impl Backend for MemoryBackend {
scope.increment_unfinished_tasks(self);
});
let stats_type = turbo_tasks.stats_type();
let id = id.into();
let task = match task_type {
TransientTaskType::Root(f) => Task::new_root(id, scope, move || f() as _, stats_type),
TransientTaskType::Once(f) => Task::new_once(id, scope, f, stats_type),
Expand All @@ -591,7 +690,13 @@ pub(crate) enum Job {
ScheduleWhenDirtyFromScope(AutoSet<TaskId>),
/// Add tasks from a scope. Scheduled by `run_add_from_scope_queue` to
/// split off work.
AddToScopeQueue(VecDeque<(TaskId, usize)>, TaskScopeId, bool),
AddToScopeQueue {
queue: VecDeque<TaskId>,
scope: TaskScopeId,
/// Number of scopes that are currently being merged into this scope.
/// This information is only used for optimization.
merging_scopes: usize,
},
/// Remove tasks from a scope. Scheduled by `run_remove_from_scope_queue` to
/// split off work.
RemoveFromScopeQueue(VecDeque<TaskId>, TaskScopeId),
Expand All @@ -618,7 +723,7 @@ impl Job {
Job::RemoveFromScopes(tasks, scopes) => {
for task in tasks {
backend.with_task(task, |task| {
task.remove_from_scopes(scopes.iter().cloned(), backend, turbo_tasks)
task.remove_from_scopes(scopes.iter().copied(), backend, turbo_tasks)
});
}
backend.scope_add_remove_priority.finish_high();
Expand All @@ -638,17 +743,15 @@ impl Job {
})
}
}
Job::AddToScopeQueue(queue, id, is_optimization_scope) => {
Job::AddToScopeQueue {
queue,
scope,
merging_scopes,
} => {
backend
.scope_add_remove_priority
.run_low(async {
run_add_to_scope_queue(
queue,
id,
is_optimization_scope,
backend,
turbo_tasks,
);
run_add_to_scope_queue(queue, scope, merging_scopes, backend, turbo_tasks);
})
.await;
}
Expand Down
13 changes: 10 additions & 3 deletions crates/turbo-tasks-memory/src/memory_backend_with_pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ use turbo_tasks::{
ActivateResult, DeactivateResult, PersistResult, PersistTaskState, PersistedGraph,
PersistedGraphApi, ReadTaskState, TaskCell, TaskData,
},
primitives::RawVcSetVc,
util::{IdFactory, NoMoveVec, SharedError},
CellId, RawVc, TaskId, TraitTypeId, TurboTasksBackendApi,
CellId, RawVc, TaskId, TraitTypeId, TurboTasksBackendApi, Unused,
};

type RootTaskFn =
Expand Down Expand Up @@ -1395,13 +1396,13 @@ impl<P: PersistedGraph> Backend for MemoryBackendWithPersistedGraph<P> {
}
}

fn try_read_task_collectibles(
fn read_task_collectibles(
&self,
_task: TaskId,
_trait_id: TraitTypeId,
_reader: TaskId,
_turbo_tasks: &dyn TurboTasksBackendApi,
) -> Result<Result<AutoSet<RawVc>, EventListener>> {
) -> RawVcSetVc {
todo!()
}

Expand Down Expand Up @@ -1484,6 +1485,7 @@ impl<P: PersistedGraph> Backend for MemoryBackendWithPersistedGraph<P> {
}

let task = turbo_tasks.get_fresh_task_id();
let task = task.into();
let new_task = Task {
active_parents: AtomicU32::new(1),
task_state: Mutex::new(TaskState {
Expand All @@ -1505,6 +1507,7 @@ impl<P: PersistedGraph> Backend for MemoryBackendWithPersistedGraph<P> {
// SAFETY: We are still the only owner of this task and id
unsafe {
self.tasks.remove(*task);
let task = Unused::new_unchecked(task);
turbo_tasks.reuse_task_id(task);
}
self.connect(parent_task, existing_task, turbo_tasks);
Expand All @@ -1528,6 +1531,7 @@ impl<P: PersistedGraph> Backend for MemoryBackendWithPersistedGraph<P> {
turbo_tasks: &dyn TurboTasksBackendApi,
) -> TaskId {
let task = turbo_tasks.get_fresh_task_id();
let task = task.into();
let new_task = Task {
active_parents: AtomicU32::new(1),
task_state: Mutex::new(TaskState {
Expand Down Expand Up @@ -1567,14 +1571,17 @@ impl<'a, P: PersistedGraph> PersistedGraphApi for MemoryBackendPersistedGraphApi
task_type: TaskType::Persistent(task_type.clone()),
};
let task = self.turbo_tasks.get_fresh_task_id();
let task = task.into();
// SAFETY: It's a fresh task id
unsafe {
self.backend.tasks.insert(*task, new_task);
}
match cache.entry(task_type) {
Entry::Occupied(e) => {
let value = *e.into_ref();
// Safety: We didn't store the task id in the cache, we it's still unused
unsafe {
let task = Unused::new_unchecked(task);
self.turbo_tasks.reuse_task_id(task);
}
value
Expand Down
Loading