Skip to content

Commit

Permalink
Merge d08ee17 into c2dc79a
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra authored Feb 17, 2023
2 parents c2dc79a + d08ee17 commit 109df2c
Show file tree
Hide file tree
Showing 16 changed files with 441 additions and 284 deletions.
6 changes: 6 additions & 0 deletions crates/next-core/src/app_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,12 @@ async fn create_app_source_for_directory(
);
}

let sources = sources
.into_iter()
.map(|source| source.resolve())
.try_join()
.await?;

Ok(CombinedContentSource { sources }.cell())
}

Expand Down
16 changes: 10 additions & 6 deletions crates/next-core/src/page_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize};
use turbo_tasks::{
primitives::{BoolVc, StringVc, StringsVc},
trace::TraceRawVcs,
Value,
TryJoinIterExt, Value,
};
use turbo_tasks_env::ProcessEnvVc;
use turbo_tasks_fs::{
Expand Down Expand Up @@ -287,10 +287,10 @@ pub async fn create_page_source(
Ok(CombinedContentSource {
sources: vec![
// Match _next/404 first to ensure rewrites work properly.
force_not_found_source,
page_source.into(),
fallback_source.into(),
fallback_not_found_source,
force_not_found_source.resolve().await?,
page_source.as_content_source().resolve().await?,
fallback_source.as_content_source().resolve().await?,
fallback_not_found_source.resolve().await?,
],
}
.cell()
Expand Down Expand Up @@ -656,7 +656,11 @@ async fn create_page_source_for_directory(
sources.sort_by_key(|(k, _)| *k);

Ok(CombinedContentSource {
sources: sources.into_iter().map(|(_, v)| v).collect(),
sources: sources
.into_iter()
.map(|(_, v)| v.resolve())
.try_join()
.await?,
}
.cell())
}
Expand Down
23 changes: 13 additions & 10 deletions crates/next-dev/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ async fn source(
turbo_tasks: turbo_tasks.into(),
}
.cell()
.into();
.as_content_source();
let static_source =
StaticAssetsContentSourceVc::new(String::new(), project_path.join("public")).into();
let manifest_source = DevManifestContentSource {
Expand All @@ -371,28 +371,31 @@ async fn source(
roots: HashSet::from([main_source.into()]),
}
.cell()
.into();
.as_content_source();
let main_source = main_source.into();
let source_maps = SourceMapContentSourceVc::new(main_source).into();
let source_map_trace = NextSourceMapTraceContentSourceVc::new(main_source).into();
let source_maps = SourceMapContentSourceVc::new(main_source).as_content_source();
let source_map_trace = NextSourceMapTraceContentSourceVc::new(main_source).as_content_source();
let img_source = NextImageContentSourceVc::new(
CombinedContentSourceVc::new(vec![static_source, page_source]).into(),
)
.into();
.as_content_source();
let router_source =
NextRouterContentSourceVc::new(main_source, execution_context, next_config, server_addr)
.into();
let source = RouterContentSource {
routes: vec![
("__turbopack__/".to_string(), introspect),
("__turbo_tasks__/".to_string(), viz),
("__turbopack__/".to_string(), introspect.resolve().await?),
("__turbo_tasks__/".to_string(), viz.resolve().await?),
(
"__nextjs_original-stack-frame".to_string(),
source_map_trace,
source_map_trace.resolve().await?,
),
// TODO: Load path from next.config.js
("_next/image".to_string(), img_source),
("__turbopack_sourcemap__/".to_string(), source_maps),
("_next/image".to_string(), img_source.resolve().await?),
(
"__turbopack_sourcemap__/".to_string(),
source_maps.resolve().await?,
),
],
fallback: router_source,
}
Expand Down
2 changes: 2 additions & 0 deletions crates/turbo-tasks-memory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ log_connect_tasks = []
report_expensive = []
print_scope_updates = []
print_task_invalidation = []
inline_add_to_scope = []
inline_remove_from_scope = []

[[bench]]
name = "mod"
Expand Down
5 changes: 5 additions & 0 deletions crates/turbo-tasks-memory/src/count_hash_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ impl<T: Eq + Hash, H: BuildHasher + Default> CountHashSet<T, H> {
self.add_count(item, 1)
}

/// Returns the current count of an item
pub fn get(&self, item: &T) -> isize {
*self.inner.get(item).unwrap_or(&0)
}

/// Returns true when the value is no longer visible from outside
pub fn remove_count(&mut self, item: T, count: usize) -> bool {
match self.inner.entry(item) {
Expand Down
167 changes: 127 additions & 40 deletions crates/turbo-tasks-memory/src/memory_backend.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::{
borrow::Cow,
borrow::{Borrow, Cow},
cell::RefCell,
cmp::min,
collections::VecDeque,
future::Future,
hash::BuildHasherDefault,
hash::{BuildHasher, BuildHasherDefault, Hash},
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Expand All @@ -24,6 +24,7 @@ use turbo_tasks::{
TransientTaskType,
},
event::EventListener,
primitives::RawVcSetVc,
util::{IdFactory, NoMoveVec},
CellId, RawVc, TaskId, TraitTypeId, TurboTasksBackendApi,
};
Expand All @@ -48,6 +49,8 @@ pub struct MemoryBackend {
backend_jobs: NoMoveVec<Job>,
backend_job_id_factory: IdFactory<BackendJobId>,
task_cache: DashMap<Arc<PersistentTaskType>, TaskId, BuildHasherDefault<FxHasher>>,
read_collectibles_task_cache:
DashMap<(TaskScopeId, TraitTypeId), TaskId, BuildHasherDefault<FxHasher>>,
memory_limit: usize,
gc_queue: Option<GcQueue>,
idle_gc_active: AtomicBool,
Expand Down Expand Up @@ -76,6 +79,7 @@ impl MemoryBackend {
backend_jobs: NoMoveVec::new(),
backend_job_id_factory: IdFactory::new(),
task_cache: DashMap::default(),
read_collectibles_task_cache: DashMap::default(),
memory_limit,
gc_queue: (memory_limit != usize::MAX).then(GcQueue::new),
idle_gc_active: AtomicBool::new(false),
Expand Down Expand Up @@ -121,12 +125,22 @@ impl MemoryBackend {
for id in self.task_cache.clone().into_read_only().values() {
func(*id);
}
for id in self
.read_collectibles_task_cache
.clone()
.into_read_only()
.values()
{
func(*id);
}
}

#[inline(always)]
pub fn with_task<T>(&self, id: TaskId, func: impl FnOnce(&Task) -> T) -> T {
func(self.memory_tasks.get(*id).unwrap())
}

#[inline(always)]
pub fn with_scope<T>(&self, id: TaskScopeId, func: impl FnOnce(&TaskScope) -> T) -> T {
func(self.memory_task_scopes.get(*id).unwrap())
}
Expand Down Expand Up @@ -261,6 +275,97 @@ impl MemoryBackend {
}
}
}

pub(crate) fn get_or_create_read_collectibles_task(
&self,
scope_id: TaskScopeId,
trait_type: TraitTypeId,
parent_task: TaskId,
root_scoped: bool,
turbo_tasks: &dyn TurboTasksBackendApi,
) -> TaskId {
if let Some(task) = self.lookup_and_connect_task(
parent_task,
&self.read_collectibles_task_cache,
&(scope_id, trait_type),
turbo_tasks,
) {
// fast pass without creating a new task
task
} else {
// slow pass with key lock
let id = turbo_tasks.get_fresh_task_id();
let task =
Task::new_read_collectibles(id, scope_id, trait_type, turbo_tasks.stats_type());
// Safety: We have a fresh task id that nobody knows about yet
unsafe {
self.insert_and_connect_fresh_task(
parent_task,
&self.read_collectibles_task_cache,
(scope_id, trait_type),
id,
task,
root_scoped,
turbo_tasks,
)
}
}
}

/// # Safty
///
/// `new_id` must be an unused task id
unsafe fn insert_and_connect_fresh_task<K: Eq + Hash, H: BuildHasher + Clone>(
&self,
parent_task: TaskId,
task_cache: &DashMap<K, TaskId, H>,
key: K,
new_id: TaskId,
task: Task,
root_scoped: bool,
turbo_tasks: &dyn TurboTasksBackendApi,
) -> TaskId {
// Safety: We have a fresh task id that nobody knows about yet
let task = unsafe { self.memory_tasks.insert(*new_id, task) };
if root_scoped {
task.make_root_scoped(self, turbo_tasks);
}
let result_task = match task_cache.entry(key) {
Entry::Vacant(entry) => {
// This is the most likely case
entry.insert(new_id);
new_id
}
Entry::Occupied(entry) => {
// Safety: We have a fresh task id that nobody knows about yet
unsafe {
self.memory_tasks.remove(*new_id);
turbo_tasks.reuse_task_id(new_id);
}
*entry.get()
}
};
self.connect_task_child(parent_task, result_task, turbo_tasks);
result_task
}

fn lookup_and_connect_task<K: Hash + Eq, Q, H: BuildHasher + Clone>(
&self,
parent_task: TaskId,
task_cache: &DashMap<K, TaskId, H>,
key: &Q,
turbo_tasks: &dyn TurboTasksBackendApi,
) -> Option<TaskId>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
task_cache.get(key).map(|task| {
self.connect_task_child(parent_task, *task, turbo_tasks);

*task
})
}
}

impl Backend for MemoryBackend {
Expand Down Expand Up @@ -445,15 +550,15 @@ impl Backend for MemoryBackend {
})
}

fn try_read_task_collectibles(
fn read_task_collectibles(
&self,
id: TaskId,
trait_id: TraitTypeId,
reader: TaskId,
turbo_tasks: &dyn TurboTasksBackendApi,
) -> Result<Result<AutoSet<RawVc>, EventListener>> {
) -> RawVcSetVc {
self.with_task(id, |task| {
task.try_read_task_collectibles(reader, trait_id, self, turbo_tasks)
task.read_task_collectibles(reader, trait_id, self, turbo_tasks)
})
}

Expand Down Expand Up @@ -519,12 +624,10 @@ impl Backend for MemoryBackend {
parent_task: TaskId,
turbo_tasks: &dyn TurboTasksBackendApi,
) -> TaskId {
let result = if let Some(task) = self.task_cache.get(&task_type).map(|task| *task) {
if let Some(task) =
self.lookup_and_connect_task(parent_task, &self.task_cache, &task_type, turbo_tasks)
{
// fast pass without creating a new task
self.connect_task_child(parent_task, task, turbo_tasks);

// TODO maybe force (background) scheduling to avoid inactive tasks hanging in
// "in progress" until they become active
task
} else {
// It's important to avoid overallocating memory as this will go into the task
Expand All @@ -536,27 +639,17 @@ impl Backend for MemoryBackend {
let task = Task::new_persistent(id, task_type.clone(), turbo_tasks.stats_type());
// Safety: We have a fresh task id that nobody knows about yet
unsafe {
self.memory_tasks.insert(*id, task);
self.insert_and_connect_fresh_task(
parent_task,
&self.task_cache,
task_type,
id,
task,
false,
turbo_tasks,
)
}
let result_task = match self.task_cache.entry(task_type) {
Entry::Vacant(entry) => {
// This is the most likely case
entry.insert(id);
id
}
Entry::Occupied(entry) => {
// Safety: We have a fresh task id that nobody knows about yet
unsafe {
self.memory_tasks.remove(*id);
turbo_tasks.reuse_task_id(id);
}
*entry.get()
}
};
self.connect_task_child(parent_task, result_task, turbo_tasks);
result_task
};
result
}
}

fn create_transient_task(
Expand Down Expand Up @@ -591,7 +684,7 @@ pub(crate) enum Job {
ScheduleWhenDirtyFromScope(AutoSet<TaskId>),
/// Add tasks from a scope. Scheduled by `run_add_from_scope_queue` to
/// split off work.
AddToScopeQueue(VecDeque<(TaskId, usize)>, TaskScopeId, bool),
AddToScopeQueue(VecDeque<TaskId>, TaskScopeId, usize),
/// Remove tasks from a scope. Scheduled by `run_remove_from_scope_queue` to
/// split off work.
RemoveFromScopeQueue(VecDeque<TaskId>, TaskScopeId),
Expand All @@ -618,7 +711,7 @@ impl Job {
Job::RemoveFromScopes(tasks, scopes) => {
for task in tasks {
backend.with_task(task, |task| {
task.remove_from_scopes(scopes.iter().cloned(), backend, turbo_tasks)
task.remove_from_scopes(scopes.iter().copied(), backend, turbo_tasks)
});
}
backend.scope_add_remove_priority.finish_high();
Expand All @@ -638,17 +731,11 @@ impl Job {
})
}
}
Job::AddToScopeQueue(queue, id, is_optimization_scope) => {
Job::AddToScopeQueue(queue, id, merging_scopes) => {
backend
.scope_add_remove_priority
.run_low(async {
run_add_to_scope_queue(
queue,
id,
is_optimization_scope,
backend,
turbo_tasks,
);
run_add_to_scope_queue(queue, id, merging_scopes, backend, turbo_tasks);
})
.await;
}
Expand Down
Loading

0 comments on commit 109df2c

Please sign in to comment.