Skip to content

Commit

Permalink
Minimal implementation of local cells
Browse files Browse the repository at this point in the history
  • Loading branch information
bgw committed Jul 18, 2024
1 parent b3460d1 commit 9fadf1a
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 70 deletions.
10 changes: 10 additions & 0 deletions crates/turbo-tasks-macros/src/value_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,16 @@ pub fn value(args: TokenStream, input: TokenStream) -> TokenStream {
let content = self;
turbo_tasks::Vc::cell_private(#cell_access_content)
}

/// Places a value in a task-local cell stored in the current task.
///
/// Task-local cells are stored in a task-local arena, and do not persist outside the
/// lifetime of the current task (including child tasks). Task-local cells can be resolved
/// to be converted into normal cells.
#cell_prefix fn local_cell(self) -> turbo_tasks::Vc<Self> {
let content = self;
turbo_tasks::Vc::cell_private(#cell_access_content)
}
};

let into = if let IntoMode::New | IntoMode::Shared = into_mode {
Expand Down
2 changes: 1 addition & 1 deletion crates/turbo-tasks-memory/src/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl Display for OutputContent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OutputContent::Empty => write!(f, "empty"),
OutputContent::Link(raw_vc) => write!(f, "link {}", raw_vc),
OutputContent::Link(raw_vc) => write!(f, "link {:?}", raw_vc),
OutputContent::Error(err) => write!(f, "error {}", err),
OutputContent::Panic(Some(message)) => write!(f, "panic {}", message),
OutputContent::Panic(None) => write!(f, "panic"),
Expand Down
26 changes: 26 additions & 0 deletions crates/turbo-tasks-memory/tests/local_cell.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#![feature(arbitrary_self_types)]

use turbo_tasks::Vc;
use turbo_tasks_testing::{register, run};

register!();

#[turbo_tasks::value]
struct Wrapper(u32);

#[turbo_tasks::value(transparent)]
struct TransparentWrapper(u32);

#[tokio::test]
async fn store_and_read() {
run! {
let a: Vc<u32> = Vc::local_cell(42);
assert_eq!(*a.await.unwrap(), 42);

let b = Wrapper(42).local_cell();
assert_eq!((*b.await.unwrap()).0, 42);

let c = TransparentWrapper(42).local_cell();
assert_eq!(*c.await.unwrap(), 42);
}
}
1 change: 1 addition & 0 deletions crates/turbo-tasks/src/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ define_id!(ValueTypeId: u32);
define_id!(TraitTypeId: u32);
define_id!(BackendJobId: u32);
define_id!(ExecutionId: u64, derive(Debug));
define_id!(LocalCellId: u32, derive(Debug));

impl Debug for TaskId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand Down
160 changes: 115 additions & 45 deletions crates/turbo-tasks/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use crate::{
},
capture_future::{self, CaptureFuture},
event::{Event, EventListener},
id::{BackendJobId, FunctionId, TraitTypeId},
id_factory::IdFactoryWithReuse,
id::{BackendJobId, ExecutionId, FunctionId, LocalCellId, TraitTypeId},
id_factory::{IdFactory, IdFactoryWithReuse},
raw_vc::{CellId, RawVc},
trace::TraceRawVcs,
trait_helpers::get_trait_method,
Expand Down Expand Up @@ -241,6 +241,7 @@ pub struct TurboTasks<B: Backend + 'static> {
this: Weak<Self>,
backend: B,
task_id_factory: IdFactoryWithReuse<TaskId>,
execution_id_factory: IdFactory<ExecutionId>,
stopped: AtomicBool,
currently_scheduled_tasks: AtomicUsize,
currently_scheduled_foreground_jobs: AtomicUsize,
Expand All @@ -255,7 +256,6 @@ pub struct TurboTasks<B: Backend + 'static> {
program_start: Instant,
}

#[derive(Default)]
struct CurrentTaskState {
/// Affected tasks, that are tracked during task execution. These tasks will
/// be invalidated when the execution finishes or before reading a cell
Expand All @@ -264,6 +264,26 @@ struct CurrentTaskState {

/// True if the current task has state in cells
stateful: bool,

/// A unique identifier created for each unique `CurrentTaskState`. Used to
/// check that [`CurrentTaskState::local_cells`] are valid for the current
/// `RawVc::LocalCell`.
execution_id: ExecutionId,

/// Cells for locally allocated Vcs (`RawVc::LocalCell`). This is freed
/// (along with `CurrentTaskState`) when
local_cells: Vec<SharedReference>,
}

impl CurrentTaskState {
fn new(execution_id: ExecutionId) -> Self {
Self {
tasks_to_notify: Vec::new(),
stateful: false,
execution_id,
local_cells: Vec::new(),
}
}
}

// TODO implement our own thread pool and make these thread locals instead
Expand Down Expand Up @@ -291,6 +311,7 @@ impl<B: Backend + 'static> TurboTasks<B> {
this: this.clone(),
backend,
task_id_factory,
execution_id_factory: IdFactory::new(),
stopped: AtomicBool::new(false),
currently_scheduled_tasks: AtomicUsize::new(0),
currently_scheduled_background_jobs: AtomicUsize::new(0),
Expand Down Expand Up @@ -488,50 +509,54 @@ impl<B: Backend + 'static> TurboTasks<B> {
let future = async move {
#[allow(clippy::blocks_in_conditions)]
while CURRENT_TASK_STATE
.scope(Default::default(), async {
if this.stopped.load(Ordering::Acquire) {
return false;
}
.scope(
RefCell::new(CurrentTaskState::new(this.execution_id_factory.get())),
async {
if this.stopped.load(Ordering::Acquire) {
return false;
}

// Setup thread locals
CELL_COUNTERS
.scope(Default::default(), async {
let Some(TaskExecutionSpec { future, span }) =
this.backend.try_start_task_execution(task_id, &*this)
else {
return false;
};

async {
let (result, duration, memory_usage) =
CaptureFuture::new(AssertUnwindSafe(future).catch_unwind())
.await;

let result = result.map_err(|any| match any.downcast::<String>() {
Ok(owned) => Some(Cow::Owned(*owned)),
Err(any) => match any.downcast::<&'static str>() {
Ok(str) => Some(Cow::Borrowed(*str)),
Err(_) => None,
},
});
this.backend.task_execution_result(task_id, result, &*this);
let stateful = this.finish_current_task_state();
let schedule_again = this.backend.task_execution_completed(
task_id,
duration,
memory_usage,
stateful,
&*this,
);
// task_execution_completed might need to notify tasks
this.notify_scheduled_tasks();
schedule_again
}
.instrument(span)
// Setup thread locals
CELL_COUNTERS
.scope(Default::default(), async {
let Some(TaskExecutionSpec { future, span }) =
this.backend.try_start_task_execution(task_id, &*this)
else {
return false;
};

async {
let (result, duration, memory_usage) =
CaptureFuture::new(AssertUnwindSafe(future).catch_unwind())
.await;

let result =
result.map_err(|any| match any.downcast::<String>() {
Ok(owned) => Some(Cow::Owned(*owned)),
Err(any) => match any.downcast::<&'static str>() {
Ok(str) => Some(Cow::Borrowed(*str)),
Err(_) => None,
},
});
this.backend.task_execution_result(task_id, result, &*this);
let stateful = this.finish_current_task_state();
let schedule_again = this.backend.task_execution_completed(
task_id,
duration,
memory_usage,
stateful,
&*this,
);
// task_execution_completed might need to notify tasks
this.notify_scheduled_tasks();
schedule_again
}
.instrument(span)
.await
})
.await
})
.await
})
},
)
.await
{}
this.finish_primary_job();
Expand Down Expand Up @@ -836,6 +861,7 @@ impl<B: Backend + 'static> TurboTasks<B> {
let CurrentTaskState {
tasks_to_notify,
stateful,
..
} = &mut *cell.borrow_mut();
(*stateful, take(tasks_to_notify))
});
Expand Down Expand Up @@ -1602,3 +1628,47 @@ pub fn find_cell_by_type(ty: ValueTypeId) -> CurrentCellRef {
}
})
}

pub(crate) fn create_local_cell(value: SharedReference) -> (ExecutionId, LocalCellId) {
CURRENT_TASK_STATE.with(|cell| {
let CurrentTaskState {
execution_id,
local_cells,
..
} = &mut *cell.borrow_mut();

// store in the task-local arena
local_cells.push(value);

// generate a one-indexed id
let raw_local_cell_id = local_cells.len();
let local_cell_id = if cfg!(debug_assertions) {
LocalCellId::from(u32::try_from(raw_local_cell_id).unwrap())
} else {
unsafe { LocalCellId::new_unchecked(raw_local_cell_id as u32) }
};

(*execution_id, local_cell_id)
})
}

/// Panics if the ExecutionId does not match the expected value.
pub(crate) fn read_local_cell(
execution_id: ExecutionId,
local_cell_id: LocalCellId,
) -> SharedReference {
CURRENT_TASK_STATE.with(|cell| {
let CurrentTaskState {
execution_id: expected_execution_id,
local_cells,
..
} = &*cell.borrow();
assert_eq!(
execution_id, *expected_execution_id,
"This Vc is local. Local Vcs must only be accessed within their own task. Resolve the \
Vc to convert it into a non-local version."
);
// local cell ids are one-indexed (they use NonZeroU32)
local_cells[(*local_cell_id as usize) - 1].clone()
})
}
Loading

0 comments on commit 9fadf1a

Please sign in to comment.