Skip to content

Commit

Permalink
Merge 4494954 into 8cfc41d
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra authored Feb 17, 2023
2 parents 8cfc41d + 4494954 commit cdaea61
Show file tree
Hide file tree
Showing 15 changed files with 424 additions and 273 deletions.
2 changes: 2 additions & 0 deletions crates/turbo-tasks-memory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ log_connect_tasks = []
report_expensive = []
print_scope_updates = []
print_task_invalidation = []
inline_add_to_scope = []
inline_remove_from_scope = []

[[bench]]
name = "mod"
Expand Down
5 changes: 5 additions & 0 deletions crates/turbo-tasks-memory/src/count_hash_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ impl<T: Eq + Hash, H: BuildHasher + Default> CountHashSet<T, H> {
self.add_count(item, 1)
}

/// Returns the current count of an item
pub fn get(&self, item: &T) -> isize {
*self.inner.get(item).unwrap_or(&0)
}

/// Returns true when the value is no longer visible from outside
pub fn remove_count(&mut self, item: T, count: usize) -> bool {
match self.inner.entry(item) {
Expand Down
167 changes: 127 additions & 40 deletions crates/turbo-tasks-memory/src/memory_backend.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::{
borrow::Cow,
borrow::{Borrow, Cow},
cell::RefCell,
cmp::min,
collections::VecDeque,
future::Future,
hash::BuildHasherDefault,
hash::{BuildHasher, BuildHasherDefault, Hash},
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Expand All @@ -24,6 +24,7 @@ use turbo_tasks::{
TransientTaskType,
},
event::EventListener,
primitives::RawVcSetVc,
util::{IdFactory, NoMoveVec},
CellId, RawVc, TaskId, TraitTypeId, TurboTasksBackendApi,
};
Expand All @@ -48,6 +49,8 @@ pub struct MemoryBackend {
backend_jobs: NoMoveVec<Job>,
backend_job_id_factory: IdFactory<BackendJobId>,
task_cache: DashMap<Arc<PersistentTaskType>, TaskId, BuildHasherDefault<FxHasher>>,
read_collectibles_task_cache:
DashMap<(TaskScopeId, TraitTypeId), TaskId, BuildHasherDefault<FxHasher>>,
memory_limit: usize,
gc_queue: Option<GcQueue>,
idle_gc_active: AtomicBool,
Expand Down Expand Up @@ -76,6 +79,7 @@ impl MemoryBackend {
backend_jobs: NoMoveVec::new(),
backend_job_id_factory: IdFactory::new(),
task_cache: DashMap::default(),
read_collectibles_task_cache: DashMap::default(),
memory_limit,
gc_queue: (memory_limit != usize::MAX).then(GcQueue::new),
idle_gc_active: AtomicBool::new(false),
Expand Down Expand Up @@ -121,12 +125,22 @@ impl MemoryBackend {
for id in self.task_cache.clone().into_read_only().values() {
func(*id);
}
for id in self
.read_collectibles_task_cache
.clone()
.into_read_only()
.values()
{
func(*id);
}
}

#[inline(always)]
pub fn with_task<T>(&self, id: TaskId, func: impl FnOnce(&Task) -> T) -> T {
func(self.memory_tasks.get(*id).unwrap())
}

#[inline(always)]
pub fn with_scope<T>(&self, id: TaskScopeId, func: impl FnOnce(&TaskScope) -> T) -> T {
func(self.memory_task_scopes.get(*id).unwrap())
}
Expand Down Expand Up @@ -261,6 +275,97 @@ impl MemoryBackend {
}
}
}

pub(crate) fn get_or_create_read_collectibles_task(
&self,
scope_id: TaskScopeId,
trait_type: TraitTypeId,
parent_task: TaskId,
root_scoped: bool,
turbo_tasks: &dyn TurboTasksBackendApi,
) -> TaskId {
if let Some(task) = self.lookup_and_connect_task(
parent_task,
&self.read_collectibles_task_cache,
&(scope_id, trait_type),
turbo_tasks,
) {
// fast pass without creating a new task
task
} else {
// slow pass with key lock
let id = turbo_tasks.get_fresh_task_id();
let task =
Task::new_read_collectibles(id, scope_id, trait_type, turbo_tasks.stats_type());
// Safety: We have a fresh task id that nobody knows about yet
unsafe {
self.insert_and_connect_fresh_task(
parent_task,
&self.read_collectibles_task_cache,
(scope_id, trait_type),
id,
task,
root_scoped,
turbo_tasks,
)
}
}
}

/// # Safty
///
/// `new_id` must be an unused task id
unsafe fn insert_and_connect_fresh_task<K: Eq + Hash, H: BuildHasher + Clone>(
&self,
parent_task: TaskId,
task_cache: &DashMap<K, TaskId, H>,
key: K,
new_id: TaskId,
task: Task,
root_scoped: bool,
turbo_tasks: &dyn TurboTasksBackendApi,
) -> TaskId {
// Safety: We have a fresh task id that nobody knows about yet
let task = unsafe { self.memory_tasks.insert(*new_id, task) };
if root_scoped {
task.make_root_scoped(self, turbo_tasks);
}
let result_task = match task_cache.entry(key) {
Entry::Vacant(entry) => {
// This is the most likely case
entry.insert(new_id);
new_id
}
Entry::Occupied(entry) => {
// Safety: We have a fresh task id that nobody knows about yet
unsafe {
self.memory_tasks.remove(*new_id);
turbo_tasks.reuse_task_id(new_id);
}
*entry.get()
}
};
self.connect_task_child(parent_task, result_task, turbo_tasks);
result_task
}

fn lookup_and_connect_task<K: Hash + Eq, Q, H: BuildHasher + Clone>(
&self,
parent_task: TaskId,
task_cache: &DashMap<K, TaskId, H>,
key: &Q,
turbo_tasks: &dyn TurboTasksBackendApi,
) -> Option<TaskId>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
task_cache.get(key).map(|task| {
self.connect_task_child(parent_task, *task, turbo_tasks);

*task
})
}
}

impl Backend for MemoryBackend {
Expand Down Expand Up @@ -445,15 +550,15 @@ impl Backend for MemoryBackend {
})
}

fn try_read_task_collectibles(
fn read_task_collectibles(
&self,
id: TaskId,
trait_id: TraitTypeId,
reader: TaskId,
turbo_tasks: &dyn TurboTasksBackendApi,
) -> Result<Result<AutoSet<RawVc>, EventListener>> {
) -> RawVcSetVc {
self.with_task(id, |task| {
task.try_read_task_collectibles(reader, trait_id, self, turbo_tasks)
task.read_task_collectibles(reader, trait_id, self, turbo_tasks)
})
}

Expand Down Expand Up @@ -519,12 +624,10 @@ impl Backend for MemoryBackend {
parent_task: TaskId,
turbo_tasks: &dyn TurboTasksBackendApi,
) -> TaskId {
let result = if let Some(task) = self.task_cache.get(&task_type).map(|task| *task) {
if let Some(task) =
self.lookup_and_connect_task(parent_task, &self.task_cache, &task_type, turbo_tasks)
{
// fast pass without creating a new task
self.connect_task_child(parent_task, task, turbo_tasks);

// TODO maybe force (background) scheduling to avoid inactive tasks hanging in
// "in progress" until they become active
task
} else {
// It's important to avoid overallocating memory as this will go into the task
Expand All @@ -536,27 +639,17 @@ impl Backend for MemoryBackend {
let task = Task::new_persistent(id, task_type.clone(), turbo_tasks.stats_type());
// Safety: We have a fresh task id that nobody knows about yet
unsafe {
self.memory_tasks.insert(*id, task);
self.insert_and_connect_fresh_task(
parent_task,
&self.task_cache,
task_type,
id,
task,
false,
turbo_tasks,
)
}
let result_task = match self.task_cache.entry(task_type) {
Entry::Vacant(entry) => {
// This is the most likely case
entry.insert(id);
id
}
Entry::Occupied(entry) => {
// Safety: We have a fresh task id that nobody knows about yet
unsafe {
self.memory_tasks.remove(*id);
turbo_tasks.reuse_task_id(id);
}
*entry.get()
}
};
self.connect_task_child(parent_task, result_task, turbo_tasks);
result_task
};
result
}
}

fn create_transient_task(
Expand Down Expand Up @@ -591,7 +684,7 @@ pub(crate) enum Job {
ScheduleWhenDirtyFromScope(AutoSet<TaskId>),
/// Add tasks from a scope. Scheduled by `run_add_from_scope_queue` to
/// split off work.
AddToScopeQueue(VecDeque<(TaskId, usize)>, TaskScopeId, bool),
AddToScopeQueue(VecDeque<TaskId>, TaskScopeId, usize),
/// Remove tasks from a scope. Scheduled by `run_remove_from_scope_queue` to
/// split off work.
RemoveFromScopeQueue(VecDeque<TaskId>, TaskScopeId),
Expand All @@ -618,7 +711,7 @@ impl Job {
Job::RemoveFromScopes(tasks, scopes) => {
for task in tasks {
backend.with_task(task, |task| {
task.remove_from_scopes(scopes.iter().cloned(), backend, turbo_tasks)
task.remove_from_scopes(scopes.iter().copied(), backend, turbo_tasks)
});
}
backend.scope_add_remove_priority.finish_high();
Expand All @@ -638,17 +731,11 @@ impl Job {
})
}
}
Job::AddToScopeQueue(queue, id, is_optimization_scope) => {
Job::AddToScopeQueue(queue, id, merging_scopes) => {
backend
.scope_add_remove_priority
.run_low(async {
run_add_to_scope_queue(
queue,
id,
is_optimization_scope,
backend,
turbo_tasks,
);
run_add_to_scope_queue(queue, id, merging_scopes, backend, turbo_tasks);
})
.await;
}
Expand Down
5 changes: 3 additions & 2 deletions crates/turbo-tasks-memory/src/memory_backend_with_pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use turbo_tasks::{
ActivateResult, DeactivateResult, PersistResult, PersistTaskState, PersistedGraph,
PersistedGraphApi, ReadTaskState, TaskCell, TaskData,
},
primitives::RawVcSetVc,
util::{IdFactory, NoMoveVec, SharedError},
CellId, RawVc, TaskId, TraitTypeId, TurboTasksBackendApi,
};
Expand Down Expand Up @@ -1395,13 +1396,13 @@ impl<P: PersistedGraph> Backend for MemoryBackendWithPersistedGraph<P> {
}
}

fn try_read_task_collectibles(
fn read_task_collectibles(
&self,
_task: TaskId,
_trait_id: TraitTypeId,
_reader: TaskId,
_turbo_tasks: &dyn TurboTasksBackendApi,
) -> Result<Result<AutoSet<RawVc>, EventListener>> {
) -> RawVcSetVc {
todo!()
}

Expand Down
Loading

0 comments on commit cdaea61

Please sign in to comment.