diff --git a/Cargo.lock b/Cargo.lock index 865c4cbe454939..731d2cb6d69c7e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -82,6 +82,12 @@ dependencies = [ "alloc-no-stdlib", ] +[[package]] +name = "allocator-api2" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" + [[package]] name = "allsorts" version = "0.14.1" @@ -1651,7 +1657,7 @@ dependencies = [ "cssparser-macros", "dtoa-short", "itoa", - "phf 0.10.1", + "phf 0.11.2", "serde", "smallvec", ] @@ -2586,6 +2592,10 @@ name = "hashbrown" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +dependencies = [ + "ahash 0.8.9", + "allocator-api2", +] [[package]] name = "hdrhistogram" @@ -4617,9 +4627,7 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fabbf1ead8a5bcbc20f5f8b939ee3f5b0f6f281b6ad3468b84656b658b455259" dependencies = [ - "phf_macros 0.10.0", "phf_shared 0.10.0", - "proc-macro-hack", ] [[package]] @@ -4628,7 +4636,7 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc" dependencies = [ - "phf_macros 0.11.2", + "phf_macros", "phf_shared 0.11.2", ] @@ -4662,20 +4670,6 @@ dependencies = [ "rand", ] -[[package]] -name = "phf_macros" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58fdf3184dd560f160dd73922bea2d5cd6e8f064bf4b13110abd81b03697b4e0" -dependencies = [ - "phf_generator 0.10.0", - "phf_shared 0.10.0", - "proc-macro-hack", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "phf_macros" version = "0.11.2" @@ -4939,12 +4933,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "proc-macro-hack" -version = "0.5.20+deprecated" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" - [[package]] name = "proc-macro2" version = "1.0.79" @@ -7992,16 +7980,17 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.10" +version = "0.7.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" +checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" dependencies = [ "bytes", "futures-core", "futures-sink", + "futures-util", + "hashbrown 0.14.3", "pin-project-lite", "tokio", - "tracing", ] [[package]] @@ -8374,6 +8363,7 @@ dependencies = [ "serde_regex", "thiserror", "tokio", + "tokio-util", "tracing", "triomphe 0.1.12", "turbo-tasks-build", diff --git a/Cargo.toml b/Cargo.toml index 9c69dcbeb75dfd..58b341b5e359d9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -204,7 +204,7 @@ tempfile = "3.3.0" thiserror = "1.0.48" tiny-gradient = "0.1.0" tokio = "1.25.0" -tokio-util = { version = "0.7.7", features = ["io"] } +tokio-util = { version = "0.7.11", features = ["io", "rt"] } tracing = "0.1.37" tracing-subscriber = "0.3.16" triomphe = { git = "https://github.com/sokra/triomphe", branch = "sokra/unstable" } diff --git a/turbopack/crates/turbo-tasks-memory/tests/detached.rs b/turbopack/crates/turbo-tasks-memory/tests/detached.rs new file mode 120000 index 00000000000000..e726e54a7881e6 --- /dev/null +++ b/turbopack/crates/turbo-tasks-memory/tests/detached.rs @@ -0,0 +1 @@ +../../turbo-tasks-testing/tests/detached.rs \ No newline at end of file diff --git a/turbopack/crates/turbo-tasks-testing/tests/detached.rs b/turbopack/crates/turbo-tasks-testing/tests/detached.rs new file mode 100644 index 00000000000000..e738d43ae76f0e --- /dev/null +++ b/turbopack/crates/turbo-tasks-testing/tests/detached.rs @@ -0,0 +1,54 @@ +#![feature(arbitrary_self_types)] + +use tokio::{ + sync::{watch, Notify}, + time::{timeout, Duration}, +}; +use turbo_tasks::{turbo_tasks, Completion, TransientInstance, Vc}; +use turbo_tasks_testing::{register, run, Registration}; + +static REGISTRATION: Registration = register!(); + +#[tokio::test] +async fn test_spawns_detached() -> anyhow::Result<()> { + run(®ISTRATION, || async { + let notify = TransientInstance::new(Notify::new()); + let (tx, mut rx) = watch::channel(None); + + // create the task + let out_vc = spawns_detached(notify.clone(), TransientInstance::new(tx)); + + // see that the task does not exit yet + timeout(Duration::from_millis(100), out_vc.strongly_consistent()) + .await + .expect_err("should wait on the detached task"); + + // let the detached future exit + notify.notify_waiters(); + + // it should send us back a cell + let detached_vc: Vc = rx.wait_for(|opt| opt.is_some()).await.unwrap().unwrap(); + assert_eq!(*detached_vc.await.unwrap(), 42); + + // the parent task should now be able to exit + out_vc.strongly_consistent().await.unwrap(); + + Ok(()) + }) + .await +} + +#[turbo_tasks::function] +fn spawns_detached( + notify: TransientInstance, + sender: TransientInstance>>>, +) -> Vc { + tokio::spawn(turbo_tasks().detached_for_testing(Box::pin(async move { + notify.notified().await; + // creating cells after the normal lifetime of the task should be okay, as the parent task + // is waiting on us before exiting! + sender.send(Some(Vc::cell(42))).unwrap(); + Ok(()) + }))); + Completion::new() +} diff --git a/turbopack/crates/turbo-tasks/Cargo.toml b/turbopack/crates/turbo-tasks/Cargo.toml index 37a7f23b79f0db..a22c8c7fd9cb93 100644 --- a/turbopack/crates/turbo-tasks/Cargo.toml +++ b/turbopack/crates/turbo-tasks/Cargo.toml @@ -37,6 +37,7 @@ serde_json = { workspace = true } serde_regex = "1.1.0" thiserror = { workspace = true } tokio = { workspace = true, features = ["full"] } +tokio-util = { workspace = true } tracing = { workspace = true } triomphe = { workspace = true, features = ["unsize", "unstable"] } turbo-tasks-hash = { workspace = true } diff --git a/turbopack/crates/turbo-tasks/src/manager.rs b/turbopack/crates/turbo-tasks/src/manager.rs index fde9512d617e85..18aa99080789cb 100644 --- a/turbopack/crates/turbo-tasks/src/manager.rs +++ b/turbopack/crates/turbo-tasks/src/manager.rs @@ -1,6 +1,5 @@ use std::{ borrow::Cow, - cell::RefCell, future::Future, hash::{BuildHasherDefault, Hash}, mem::take, @@ -8,7 +7,7 @@ use std::{ pin::Pin, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, Mutex, Weak, + Arc, Mutex, RwLock, Weak, }, thread, time::{Duration, Instant}, @@ -20,6 +19,7 @@ use futures::FutureExt; use rustc_hash::FxHasher; use serde::{de::Visitor, Deserialize, Serialize}; use tokio::{runtime::Handle, select, task_local}; +use tokio_util::task::TaskTracker; use tracing::{info_span, instrument, trace_span, Instrument, Level}; use turbo_tasks_malloc::TurboMalloc; @@ -335,9 +335,13 @@ struct CurrentTaskState { /// This is taken (and becomes `None`) during teardown of a task. cell_counters: Option, 8>>, - /// Cells for locally allocated Vcs (`RawVc::LocalCell`). This is freed - /// (along with `CurrentTaskState`) when the task finishes executing. + /// Cells for locally allocated Vcs (`RawVc::LocalCell`). This is freed (along with + /// `CurrentGlobalTaskState`) when the task finishes executing. local_cells: Vec, + + /// Tracks currently running local tasks, and defers cleanup of the global task until those + /// complete. + local_task_tracker: TaskTracker, } impl CurrentTaskState { @@ -354,6 +358,7 @@ impl CurrentTaskState { stateful: false, cell_counters: Some(AutoMap::default()), local_cells: Vec::new(), + local_task_tracker: TaskTracker::new(), } } } @@ -363,7 +368,7 @@ task_local! { /// The current TurboTasks instance static TURBO_TASKS: Arc; - static CURRENT_TASK_STATE: RefCell; + static CURRENT_TASK_STATE: Arc>; } impl TurboTasks { @@ -665,13 +670,13 @@ impl TurboTasks { let future = async move { let mut schedule_again = true; while schedule_again { - let task_state = RefCell::new(CurrentTaskState::new( + let task_state = Arc::new(RwLock::new(CurrentTaskState::new( task_id, this.execution_id_factory.get(), this.backend .try_get_function_id(task_id) .map(|func_id| &get_function(func_id).function_meta), - )); + ))); schedule_again = CURRENT_TASK_STATE .scope(task_state, async { if this.stopped.load(Ordering::Acquire) { @@ -688,6 +693,12 @@ impl TurboTasks { let (result, duration, memory_usage) = CaptureFuture::new(AssertUnwindSafe(future).catch_unwind()).await; + // wait for all spawned local tasks using `local_cells` to finish + let ltt = CURRENT_TASK_STATE + .with(|ts| ts.read().unwrap().local_task_tracker.clone()); + ltt.close(); + ltt.wait().await; + let result = result.map_err(|any| match any.downcast::() { Ok(owned) => Some(Cow::Owned(*owned)), Err(any) => match any.downcast::<&'static str>() { @@ -698,7 +709,7 @@ impl TurboTasks { this.backend.task_execution_result(task_id, result, &*this); let stateful = this.finish_current_task_state(); let cell_counters = CURRENT_TASK_STATE - .with(|ts| ts.borrow_mut().cell_counters.take().unwrap()); + .with(|ts| ts.write().unwrap().cell_counters.take().unwrap()); let schedule_again = this.backend.task_execution_completed( task_id, duration, @@ -1031,7 +1042,7 @@ impl TurboTasks { tasks_to_notify, stateful, .. - } = &mut *cell.borrow_mut(); + } = &mut *cell.write().unwrap(); (*stateful, take(tasks_to_notify)) }); @@ -1158,7 +1169,7 @@ impl TurboTasksApi for TurboTasks { let tasks = { let CurrentTaskState { tasks_to_notify, .. - } = &mut *cell.borrow_mut(); + } = &mut *cell.write().unwrap(); take(tasks_to_notify) }; if tasks.is_empty() { @@ -1272,28 +1283,28 @@ impl TurboTasksApi for TurboTasks { self.backend.mark_own_task_as_finished(task, self); } + /// Creates a future that inherits the current task id and task state. The current global task + /// will wait for this future to be dropped before exiting. fn detached_for_testing( &self, - f: Pin> + Send + 'static>>, + fut: Pin> + Send + 'static>>, ) -> Pin> + Send + 'static>> { - let current_task_state_facade = CURRENT_TASK_STATE.with(|ts| { - let ts = ts.borrow(); - CurrentTaskState { - task_id: ts.task_id, - execution_id: ts.execution_id, - function_meta: ts.function_meta, - tasks_to_notify: Vec::new(), - stateful: false, - cell_counters: ts.cell_counters.clone(), - local_cells: ts.local_cells.clone(), - } - }); - let current_task_id = current_task_state_facade.task_id; + // this is similar to what happens for a local task, except that we keep the local task's + // state as well. + let task_state = CURRENT_TASK_STATE.with(|ts| ts.clone()); + let (task_id, fut) = { + let ts = task_state.read().unwrap(); + (ts.task_id, ts.local_task_tracker.track_future(fut)) + }; Box::pin(TURBO_TASKS.scope( turbo_tasks(), CURRENT_TASK_STATE.scope( - RefCell::new(current_task_state_facade), - self.backend.execution_scope(current_task_id, f), + task_state, + // TODO(bgw): This will create a new task-local in the backend, which is not + // what we want. Instead we should replace `execution_scope` with a more + // limited API that allows storing thread-local state in a way the manager can + // control. + self.backend.execution_scope(task_id, fut), ), )) } @@ -1362,7 +1373,7 @@ impl TurboTasksBackendApi for TurboTasks { let result = CURRENT_TASK_STATE.try_with(|cell| { let CurrentTaskState { tasks_to_notify, .. - } = &mut *cell.borrow_mut(); + } = &mut *cell.write().unwrap(); tasks_to_notify.extend(tasks.iter()); }); if result.is_err() { @@ -1377,7 +1388,7 @@ impl TurboTasksBackendApi for TurboTasks { let result = CURRENT_TASK_STATE.try_with(|cell| { let CurrentTaskState { tasks_to_notify, .. - } = &mut *cell.borrow_mut(); + } = &mut *cell.write().unwrap(); tasks_to_notify.extend(tasks.iter()); }); if result.is_err() { @@ -1415,7 +1426,7 @@ impl TurboTasksBackendApi for TurboTasks { } pub(crate) fn current_task(from: &str) -> TaskId { - match CURRENT_TASK_STATE.try_with(|ts| ts.borrow().task_id) { + match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) { Ok(id) => id, Err(_) => panic!( "{} can only be used in the context of turbo_tasks task execution", @@ -1628,7 +1639,11 @@ pub fn with_turbo_tasks_for_testing( TURBO_TASKS.scope( tt, CURRENT_TASK_STATE.scope( - RefCell::new(CurrentTaskState::new(current_task, execution_id, None)), + Arc::new(RwLock::new(CurrentTaskState::new( + current_task, + execution_id, + None, + ))), f, ), ) @@ -1643,7 +1658,7 @@ pub fn spawn_detached_for_testing(f: impl Future> + Send + ' } pub fn current_task_for_testing() -> TaskId { - CURRENT_TASK_STATE.with(|ts| ts.borrow().task_id) + CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id) } /// Get an [`Invalidator`] that can be used to invalidate the current task @@ -1669,7 +1684,7 @@ pub fn mark_finished() { /// dropped without persisting the state. pub fn mark_stateful() { CURRENT_TASK_STATE.with(|cell| { - let CurrentTaskState { stateful, .. } = &mut *cell.borrow_mut(); + let CurrentTaskState { stateful, .. } = &mut *cell.write().unwrap(); *stateful = true; }) } @@ -1927,7 +1942,7 @@ impl From for RawVc { pub fn find_cell_by_type(ty: ValueTypeId) -> CurrentCellRef { CURRENT_TASK_STATE.with(|ts| { let current_task = current_task("celling turbo_tasks values"); - let mut ts = ts.borrow_mut(); + let mut ts = ts.write().unwrap(); let map = ts.cell_counters.as_mut().unwrap(); let current_index = map.entry(ty).or_default(); let index = *current_index; @@ -1940,30 +1955,28 @@ pub fn find_cell_by_type(ty: ValueTypeId) -> CurrentCellRef { } pub(crate) fn try_get_function_meta() -> Option<&'static FunctionMeta> { - CURRENT_TASK_STATE.with(|ts| ts.borrow().function_meta) + CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().function_meta) } pub(crate) fn create_local_cell(value: TypedSharedReference) -> (ExecutionId, LocalCellId) { - CURRENT_TASK_STATE.with(|ts| { + let (execution_id, raw_local_cell_id) = CURRENT_TASK_STATE.with(|ts| { let CurrentTaskState { execution_id, local_cells, .. - } = &mut *ts.borrow_mut(); + } = &mut *ts.write().unwrap(); // store in the task-local arena local_cells.push(value); - - // generate a one-indexed id - let raw_local_cell_id = local_cells.len(); - let local_cell_id = if cfg!(debug_assertions) { - LocalCellId::from(u32::try_from(raw_local_cell_id).unwrap()) - } else { - unsafe { LocalCellId::new_unchecked(raw_local_cell_id as u32) } - }; - - (*execution_id, local_cell_id) - }) + (*execution_id, local_cells.len()) + }); + // generate a one-indexed id + let local_cell_id = if cfg!(debug_assertions) { + LocalCellId::from(u32::try_from(raw_local_cell_id).unwrap()) + } else { + unsafe { LocalCellId::new_unchecked(raw_local_cell_id as u32) } + }; + (execution_id, local_cell_id) } /// Returns the contents of the given local cell. Panics if a local cell is @@ -1984,7 +1997,7 @@ pub(crate) fn read_local_cell( execution_id: expected_execution_id, local_cells, .. - } = &*ts.borrow(); + } = &*ts.write().unwrap(); assert_eq_local_cell(execution_id, *expected_execution_id); // local cell ids are one-indexed (they use NonZeroU32) local_cells[(*local_cell_id as usize) - 1].clone() @@ -1998,7 +2011,7 @@ pub(crate) fn assert_execution_id(execution_id: ExecutionId) { let CurrentTaskState { execution_id: expected_execution_id, .. - } = &*ts.borrow(); + } = &*ts.read().unwrap(); assert_eq_local_cell(execution_id, *expected_execution_id); }) }