From 3b81a4613af772db493071c247260a415132e3a0 Mon Sep 17 00:00:00 2001 From: Benjamin Woodruff Date: Wed, 14 Aug 2024 21:08:42 -0700 Subject: [PATCH] refactor(turbo-tasks): Split task state and add a TaskTracker for waiting on local tasks --- Cargo.lock | 44 ++- Cargo.toml | 2 +- turbopack/crates/turbo-tasks/Cargo.toml | 1 + turbopack/crates/turbo-tasks/src/manager.rs | 310 +++++++++++--------- 4 files changed, 186 insertions(+), 171 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 865c4cbe45493..731d2cb6d69c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -82,6 +82,12 @@ dependencies = [ "alloc-no-stdlib", ] +[[package]] +name = "allocator-api2" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" + [[package]] name = "allsorts" version = "0.14.1" @@ -1651,7 +1657,7 @@ dependencies = [ "cssparser-macros", "dtoa-short", "itoa", - "phf 0.10.1", + "phf 0.11.2", "serde", "smallvec", ] @@ -2586,6 +2592,10 @@ name = "hashbrown" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +dependencies = [ + "ahash 0.8.9", + "allocator-api2", +] [[package]] name = "hdrhistogram" @@ -4617,9 +4627,7 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fabbf1ead8a5bcbc20f5f8b939ee3f5b0f6f281b6ad3468b84656b658b455259" dependencies = [ - "phf_macros 0.10.0", "phf_shared 0.10.0", - "proc-macro-hack", ] [[package]] @@ -4628,7 +4636,7 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc" dependencies = [ - "phf_macros 0.11.2", + "phf_macros", "phf_shared 0.11.2", ] @@ -4662,20 +4670,6 @@ dependencies = [ "rand", ] -[[package]] -name = "phf_macros" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58fdf3184dd560f160dd73922bea2d5cd6e8f064bf4b13110abd81b03697b4e0" -dependencies = [ - "phf_generator 0.10.0", - "phf_shared 0.10.0", - "proc-macro-hack", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "phf_macros" version = "0.11.2" @@ -4939,12 +4933,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "proc-macro-hack" -version = "0.5.20+deprecated" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" - [[package]] name = "proc-macro2" version = "1.0.79" @@ -7992,16 +7980,17 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.10" +version = "0.7.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" +checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" dependencies = [ "bytes", "futures-core", "futures-sink", + "futures-util", + "hashbrown 0.14.3", "pin-project-lite", "tokio", - "tracing", ] [[package]] @@ -8374,6 +8363,7 @@ dependencies = [ "serde_regex", "thiserror", "tokio", + "tokio-util", "tracing", "triomphe 0.1.12", "turbo-tasks-build", diff --git a/Cargo.toml b/Cargo.toml index 9c69dcbeb75df..58b341b5e359d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -204,7 +204,7 @@ tempfile = "3.3.0" thiserror = "1.0.48" tiny-gradient = "0.1.0" tokio = "1.25.0" -tokio-util = { version = "0.7.7", features = ["io"] } +tokio-util = { version = "0.7.11", features = ["io", "rt"] } tracing = "0.1.37" tracing-subscriber = "0.3.16" triomphe = { git = "https://github.com/sokra/triomphe", branch = "sokra/unstable" } diff --git a/turbopack/crates/turbo-tasks/Cargo.toml b/turbopack/crates/turbo-tasks/Cargo.toml index 37a7f23b79f0d..a22c8c7fd9cb9 100644 --- a/turbopack/crates/turbo-tasks/Cargo.toml +++ b/turbopack/crates/turbo-tasks/Cargo.toml @@ -37,6 +37,7 @@ serde_json = { workspace = true } serde_regex = "1.1.0" thiserror = { workspace = true } tokio = { workspace = true, features = ["full"] } +tokio-util = { workspace = true } tracing = { workspace = true } triomphe = { workspace = true, features = ["unsize", "unstable"] } turbo-tasks-hash = { workspace = true } diff --git a/turbopack/crates/turbo-tasks/src/manager.rs b/turbopack/crates/turbo-tasks/src/manager.rs index 319943baacd4f..0614f5ce83ff3 100644 --- a/turbopack/crates/turbo-tasks/src/manager.rs +++ b/turbopack/crates/turbo-tasks/src/manager.rs @@ -1,6 +1,5 @@ use std::{ borrow::Cow, - cell::RefCell, future::Future, hash::{BuildHasherDefault, Hash}, mem::take, @@ -8,7 +7,7 @@ use std::{ pin::Pin, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, Mutex, Weak, + Arc, Mutex, RwLock, Weak, }, thread, time::{Duration, Instant}, @@ -20,6 +19,7 @@ use futures::FutureExt; use rustc_hash::FxHasher; use serde::{de::Visitor, Deserialize, Serialize}; use tokio::{runtime::Handle, select, task_local}; +use tokio_util::task::TaskTracker; use tracing::{info_span, instrument, trace_span, Instrument, Level}; use turbo_tasks_malloc::TurboMalloc; @@ -332,18 +332,17 @@ pub struct TurboTasks { program_start: Instant, } -struct CurrentTaskState { +/// Information about a "global" task. A global task can contain multiple "local" tasks (see +/// [`CurrentLocalTaskState`]), which all share the same global state. +/// +/// A global task is one that: +/// +/// - Has a unique task id. +/// - Is potentially cached. +/// - The backend is aware of. +struct CurrentGlobalTaskState { task_id: TaskId, - /// A unique identifier created for each unique `CurrentTaskState`. Used to - /// check that [`CurrentTaskState::local_cells`] are valid for the current - /// `RawVc::LocalCell`. - execution_id: ExecutionId, - - /// The function's metadata if this is a persistent task. Contains information about arguments - /// passed to the `#[turbo_tasks::function(...)]` macro. - function_meta: Option<&'static FunctionMeta>, - /// Affected tasks, that are tracked during task execution. These tasks will /// be invalidated when the execution finishes or before reading a cell /// value. @@ -358,25 +357,49 @@ struct CurrentTaskState { /// This is taken (and becomes `None`) during teardown of a task. cell_counters: Option, 8>>, - /// Cells for locally allocated Vcs (`RawVc::LocalCell`). This is freed - /// (along with `CurrentTaskState`) when the task finishes executing. + /// Cells for locally allocated Vcs (`RawVc::LocalCell`). This is freed (along with + /// `CurrentGlobalTaskState`) when the task finishes executing. local_cells: Vec, + + /// Tracks currently running local tasks, and defers cleanup of the global task until those + /// complete. + local_task_tracker: TaskTracker, } -impl CurrentTaskState { - fn new( - task_id: TaskId, - execution_id: ExecutionId, - function_meta: Option<&'static FunctionMeta>, - ) -> Self { +impl CurrentGlobalTaskState { + fn new(task_id: TaskId) -> Self { Self { task_id, - execution_id, - function_meta, tasks_to_notify: Vec::new(), stateful: false, cell_counters: Some(AutoMap::default()), local_cells: Vec::new(), + local_task_tracker: TaskTracker::new(), + } + } +} + +/// Information specific to the current "local" task. A local task re-uses it's parent global task's +/// [`CurrentGlobalTaskState`]. +/// +/// Even if a task itself isn't local, it will have a `CurrentLocalTaskState` representing the root +/// of the global task. +#[derive(Clone)] +struct CurrentLocalTaskState { + /// A unique identifier created for each unique [`CurrentLocalTaskState`]. Used to check that + /// [`CurrentTaskState::local_cells`] are valid for the current [`RawVc::LocalCell`]. + execution_id: ExecutionId, + + /// The function's metadata if this is a persistent task. Contains information about arguments + /// passed to the `#[turbo_tasks::function(...)]` macro. + function_meta: Option<&'static FunctionMeta>, +} + +impl CurrentLocalTaskState { + fn new(execution_id: ExecutionId, function_meta: Option<&'static FunctionMeta>) -> Self { + Self { + execution_id, + function_meta, } } } @@ -386,7 +409,8 @@ task_local! { /// The current TurboTasks instance static TURBO_TASKS: Arc; - static CURRENT_TASK_STATE: RefCell; + static CURRENT_GLOBAL_TASK_STATE: Arc>; + static CURRENT_LOCAL_TASK_STATE: CurrentLocalTaskState; } impl TurboTasks { @@ -687,55 +711,65 @@ impl TurboTasks { let future = async move { let mut schedule_again = true; while schedule_again { - let task_state = RefCell::new(CurrentTaskState::new( - task_id, + let global_task_state = Arc::new(RwLock::new(CurrentGlobalTaskState::new(task_id))); + let local_task_state = CurrentLocalTaskState::new( this.execution_id_factory.get(), this.backend .try_get_function_id(task_id) .map(|func_id| &get_function(func_id).function_meta), - )); - schedule_again = CURRENT_TASK_STATE - .scope(task_state, async { - if this.stopped.load(Ordering::Acquire) { - return false; - } + ); + let single_execution_future = async { + if this.stopped.load(Ordering::Acquire) { + return false; + } - let Some(TaskExecutionSpec { future, span }) = - this.backend.try_start_task_execution(task_id, &*this) - else { - return false; - }; - - async { - let (result, duration, memory_usage) = - CaptureFuture::new(AssertUnwindSafe(future).catch_unwind()).await; - - let result = result.map_err(|any| match any.downcast::() { - Ok(owned) => Some(Cow::Owned(*owned)), - Err(any) => match any.downcast::<&'static str>() { - Ok(str) => Some(Cow::Borrowed(*str)), - Err(_) => None, - }, - }); - this.backend.task_execution_result(task_id, result, &*this); - let stateful = this.finish_current_task_state(); - let cell_counters = CURRENT_TASK_STATE - .with(|ts| ts.borrow_mut().cell_counters.take().unwrap()); - let schedule_again = this.backend.task_execution_completed( - task_id, - duration, - memory_usage, - &cell_counters, - stateful, - &*this, - ); - // task_execution_completed might need to notify tasks - this.notify_scheduled_tasks(); - schedule_again - } - .instrument(span) - .await - }) + let Some(TaskExecutionSpec { future, span }) = + this.backend.try_start_task_execution(task_id, &*this) + else { + return false; + }; + + async { + let (result, duration, memory_usage) = + CaptureFuture::new(AssertUnwindSafe(future).catch_unwind()).await; + + // wait for all spawned local tasks using `local_cells` to finish + let ltt = CURRENT_GLOBAL_TASK_STATE + .with(|ts| ts.read().unwrap().local_task_tracker.clone()); + ltt.close(); + ltt.wait().await; + + let result = result.map_err(|any| match any.downcast::() { + Ok(owned) => Some(Cow::Owned(*owned)), + Err(any) => match any.downcast::<&'static str>() { + Ok(str) => Some(Cow::Borrowed(*str)), + Err(_) => None, + }, + }); + this.backend.task_execution_result(task_id, result, &*this); + let stateful = this.finish_current_task_state(); + let cell_counters = CURRENT_GLOBAL_TASK_STATE + .with(|ts| ts.write().unwrap().cell_counters.take().unwrap()); + let schedule_again = this.backend.task_execution_completed( + task_id, + duration, + memory_usage, + &cell_counters, + stateful, + &*this, + ); + // task_execution_completed might need to notify tasks + this.notify_scheduled_tasks(); + schedule_again + } + .instrument(span) + .await + }; + schedule_again = CURRENT_GLOBAL_TASK_STATE + .scope( + global_task_state, + CURRENT_LOCAL_TASK_STATE.scope(local_task_state, single_execution_future), + ) .await; } this.finish_primary_job(); @@ -1048,12 +1082,12 @@ impl TurboTasks { } fn finish_current_task_state(&self) -> bool { - let (stateful, tasks) = CURRENT_TASK_STATE.with(|cell| { - let CurrentTaskState { + let (stateful, tasks) = CURRENT_GLOBAL_TASK_STATE.with(|cell| { + let CurrentGlobalTaskState { tasks_to_notify, stateful, .. - } = &mut *cell.borrow_mut(); + } = &mut *cell.write().unwrap(); (*stateful, take(tasks_to_notify)) }); @@ -1176,11 +1210,11 @@ impl TurboTasksApi for TurboTasks { } fn notify_scheduled_tasks(&self) { - let _ = CURRENT_TASK_STATE.try_with(|cell| { + let _ = CURRENT_GLOBAL_TASK_STATE.try_with(|cell| { let tasks = { - let CurrentTaskState { + let CurrentGlobalTaskState { tasks_to_notify, .. - } = &mut *cell.borrow_mut(); + } = &mut *cell.write().unwrap(); take(tasks_to_notify) }; if tasks.is_empty() { @@ -1294,28 +1328,32 @@ impl TurboTasksApi for TurboTasks { self.backend.mark_own_task_as_finished(task, self); } + /// Creates a future that inherits the current task id and task state. The current global task + /// will wait for this future to be dropped before exiting. fn detached_for_testing( &self, - f: Pin> + Send + 'static>>, + fut: Pin> + Send + 'static>>, ) -> Pin> + Send + 'static>> { - let current_task_state_facade = CURRENT_TASK_STATE.with(|ts| { - let ts = ts.borrow(); - CurrentTaskState { - task_id: ts.task_id, - execution_id: ts.execution_id, - function_meta: ts.function_meta, - tasks_to_notify: Vec::new(), - stateful: false, - cell_counters: ts.cell_counters.clone(), - local_cells: ts.local_cells.clone(), - } - }); - let current_task_id = current_task_state_facade.task_id; + // this is similar to what happens for a local task, except that we keep the local task's + // state as well. + let global_task_state = CURRENT_GLOBAL_TASK_STATE.with(|ts| ts.clone()); + let local_task_state = CURRENT_LOCAL_TASK_STATE.with(|ts| ts.clone()); + let (task_id, fut) = { + let ts = global_task_state.read().unwrap(); + (ts.task_id, ts.local_task_tracker.track_future(fut)) + }; Box::pin(TURBO_TASKS.scope( turbo_tasks(), - CURRENT_TASK_STATE.scope( - RefCell::new(current_task_state_facade), - self.backend.execution_scope(current_task_id, f), + CURRENT_GLOBAL_TASK_STATE.scope( + global_task_state, + CURRENT_LOCAL_TASK_STATE.scope( + local_task_state, + // TODO(bgw): This will create a new task-local in the backend, which is not + // what we want. Instead we should replace `execution_scope` with a more + // limited API that allows storing thread-local state in a way the manager can + // control. + self.backend.execution_scope(task_id, fut), + ), ), )) } @@ -1381,10 +1419,10 @@ impl TurboTasksBackendApi for TurboTasks { /// Enqueues tasks for notification of changed dependencies. This will /// eventually call `dependent_cell_updated()` on all tasks. fn schedule_notify_tasks(&self, tasks: &[TaskId]) { - let result = CURRENT_TASK_STATE.try_with(|cell| { - let CurrentTaskState { + let result = CURRENT_GLOBAL_TASK_STATE.try_with(|cell| { + let CurrentGlobalTaskState { tasks_to_notify, .. - } = &mut *cell.borrow_mut(); + } = &mut *cell.write().unwrap(); tasks_to_notify.extend(tasks.iter()); }); if result.is_err() { @@ -1396,10 +1434,10 @@ impl TurboTasksBackendApi for TurboTasks { /// Enqueues tasks for notification of changed dependencies. This will /// eventually call `dependent_cell_updated()` on all tasks. fn schedule_notify_tasks_set(&self, tasks: &TaskIdSet) { - let result = CURRENT_TASK_STATE.try_with(|cell| { - let CurrentTaskState { + let result = CURRENT_GLOBAL_TASK_STATE.try_with(|cell| { + let CurrentGlobalTaskState { tasks_to_notify, .. - } = &mut *cell.borrow_mut(); + } = &mut *cell.write().unwrap(); tasks_to_notify.extend(tasks.iter()); }); if result.is_err() { @@ -1437,7 +1475,7 @@ impl TurboTasksBackendApi for TurboTasks { } pub(crate) fn current_task(from: &str) -> TaskId { - match CURRENT_TASK_STATE.try_with(|ts| ts.borrow().task_id) { + match CURRENT_GLOBAL_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) { Ok(id) => id, Err(_) => panic!( "{} can only be used in the context of turbo_tasks task execution", @@ -1649,9 +1687,9 @@ pub fn with_turbo_tasks_for_testing( ) -> impl Future { TURBO_TASKS.scope( tt, - CURRENT_TASK_STATE.scope( - RefCell::new(CurrentTaskState::new(current_task, execution_id, None)), - f, + CURRENT_GLOBAL_TASK_STATE.scope( + Arc::new(RwLock::new(CurrentGlobalTaskState::new(current_task))), + CURRENT_LOCAL_TASK_STATE.scope(CurrentLocalTaskState::new(execution_id, None), f), ), ) } @@ -1665,7 +1703,7 @@ pub fn spawn_detached_for_testing(f: impl Future> + Send + ' } pub fn current_task_for_testing() -> TaskId { - CURRENT_TASK_STATE.with(|ts| ts.borrow().task_id) + CURRENT_GLOBAL_TASK_STATE.with(|ts| ts.read().unwrap().task_id) } /// Get an [`Invalidator`] that can be used to invalidate the current task @@ -1690,8 +1728,8 @@ pub fn mark_finished() { /// Marks the current task as stateful. This prevents the tasks from being /// dropped without persisting the state. pub fn mark_stateful() { - CURRENT_TASK_STATE.with(|cell| { - let CurrentTaskState { stateful, .. } = &mut *cell.borrow_mut(); + CURRENT_GLOBAL_TASK_STATE.with(|cell| { + let CurrentGlobalTaskState { stateful, .. } = &mut *cell.write().unwrap(); *stateful = true; }) } @@ -1947,9 +1985,9 @@ impl From for RawVc { } pub fn find_cell_by_type(ty: ValueTypeId) -> CurrentCellRef { - CURRENT_TASK_STATE.with(|ts| { + CURRENT_GLOBAL_TASK_STATE.with(|ts| { let current_task = current_task("celling turbo_tasks values"); - let mut ts = ts.borrow_mut(); + let mut ts = ts.write().unwrap(); let map = ts.cell_counters.as_mut().unwrap(); let current_index = map.entry(ty).or_default(); let index = *current_index; @@ -1962,30 +2000,24 @@ pub fn find_cell_by_type(ty: ValueTypeId) -> CurrentCellRef { } pub(crate) fn try_get_function_meta() -> Option<&'static FunctionMeta> { - CURRENT_TASK_STATE.with(|ts| ts.borrow().function_meta) + CURRENT_LOCAL_TASK_STATE.with(|ts| ts.function_meta) } pub(crate) fn create_local_cell(value: TypedSharedReference) -> (ExecutionId, LocalCellId) { - CURRENT_TASK_STATE.with(|ts| { - let CurrentTaskState { - execution_id, - local_cells, - .. - } = &mut *ts.borrow_mut(); - + let execution_id = CURRENT_LOCAL_TASK_STATE.with(|ts| ts.execution_id); + let raw_local_cell_id = CURRENT_GLOBAL_TASK_STATE.with(|ts| { + let CurrentGlobalTaskState { local_cells, .. } = &mut *ts.write().unwrap(); // store in the task-local arena local_cells.push(value); - - // generate a one-indexed id - let raw_local_cell_id = local_cells.len(); - let local_cell_id = if cfg!(debug_assertions) { - LocalCellId::from(u32::try_from(raw_local_cell_id).unwrap()) - } else { - unsafe { LocalCellId::new_unchecked(raw_local_cell_id as u32) } - }; - - (*execution_id, local_cell_id) - }) + local_cells.len() + }); + // generate a one-indexed id + let local_cell_id = if cfg!(debug_assertions) { + LocalCellId::from(u32::try_from(raw_local_cell_id).unwrap()) + } else { + unsafe { LocalCellId::new_unchecked(raw_local_cell_id as u32) } + }; + (execution_id, local_cell_id) } /// Returns the contents of the given local cell. Panics if a local cell is @@ -2001,13 +2033,9 @@ pub(crate) fn read_local_cell( execution_id: ExecutionId, local_cell_id: LocalCellId, ) -> TypedSharedReference { - CURRENT_TASK_STATE.with(|ts| { - let CurrentTaskState { - execution_id: expected_execution_id, - local_cells, - .. - } = &*ts.borrow(); - assert_eq_local_cell(execution_id, *expected_execution_id); + assert_execution_id(execution_id); + CURRENT_GLOBAL_TASK_STATE.with(|ts| { + let CurrentGlobalTaskState { local_cells, .. } = &*ts.write().unwrap(); // local cell ids are one-indexed (they use NonZeroU32) local_cells[(*local_cell_id as usize) - 1].clone() }) @@ -2025,19 +2053,15 @@ pub(crate) async fn read_local_output( /// Panics if the [`ExecutionId`] does not match the current task's /// `execution_id`. pub(crate) fn assert_execution_id(execution_id: ExecutionId) { - CURRENT_TASK_STATE.with(|ts| { - let CurrentTaskState { + CURRENT_LOCAL_TASK_STATE.with(|ts| { + let CurrentLocalTaskState { execution_id: expected_execution_id, .. - } = &*ts.borrow(); - assert_eq_local_cell(execution_id, *expected_execution_id); + } = ts; + assert_eq!( + &execution_id, expected_execution_id, + "This Vc is local. Local Vcs must only be accessed within their own task. Resolve the \ + Vc to convert it into a non-local version." + ); }) } - -fn assert_eq_local_cell(actual: ExecutionId, expected: ExecutionId) { - assert_eq!( - actual, expected, - "This Vc is local. Local Vcs must only be accessed within their own task. Resolve the Vc \ - to convert it into a non-local version." - ); -}