Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Turbopack] add lmdb persisting and restoring #68680

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions turbopack/crates/turbo-tasks-backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ workspace = true
anyhow = { workspace = true }
async-trait = { workspace = true }
auto-hash-map = { workspace = true }
bincode = "1.3.3"
byteorder = "1.5.0"
dashmap = { workspace = true }
indexmap = { workspace = true }
lmdb = "0.8.0"
once_cell = { workspace = true }
parking_lot = { workspace = true }
rand = { workspace = true }
Expand Down
149 changes: 130 additions & 19 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ mod storage;

use std::{
borrow::Cow,
collections::HashSet,
collections::{HashMap, HashSet},
future::Future,
hash::BuildHasherDefault,
mem::take,
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
},
time::Duration,
time::{Duration, Instant},
};

use anyhow::Result;
Expand All @@ -37,6 +38,7 @@ use turbo_tasks::{

use self::{operation::ExecuteContext, storage::Storage};
use crate::{
backing_storage::BackingStorage,
data::{
CachedDataItem, CachedDataItemKey, CachedDataItemValue, CachedDataUpdate, CellRef,
InProgressState, OutputValue, RootType,
Expand All @@ -62,6 +64,8 @@ impl SnapshotRequest {
}

pub struct TurboTasksBackend {
start_time: Instant,

persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
transient_task_id_factory: IdFactoryWithReuse<TaskId>,

Expand All @@ -87,18 +91,20 @@ pub struct TurboTasksBackend {
/// Condition Variable that is triggered when a snapshot is completed and
/// operations can continue.
snapshot_completed: Condvar,
}
/// The timestamp of the last started snapshot.
last_snapshot: AtomicU64,

impl Default for TurboTasksBackend {
fn default() -> Self {
Self::new()
}
backing_storage: Arc<dyn BackingStorage + Sync + Send>,
}

impl TurboTasksBackend {
pub fn new() -> Self {
pub fn new(backing_storage: Arc<dyn BackingStorage + Sync + Send>) -> Self {
Self {
persisted_task_id_factory: IdFactoryWithReuse::new(1, (TRANSIENT_TASK_BIT - 1) as u64),
start_time: Instant::now(),
persisted_task_id_factory: IdFactoryWithReuse::new(
*backing_storage.next_free_task_id() as u64,
(TRANSIENT_TASK_BIT - 1) as u64,
),
transient_task_id_factory: IdFactoryWithReuse::new(
TRANSIENT_TASK_BIT as u64,
u32::MAX as u64,
Expand All @@ -112,6 +118,8 @@ impl TurboTasksBackend {
snapshot_request: Mutex::new(SnapshotRequest::new()),
operations_suspended: Condvar::new(),
snapshot_completed: Condvar::new(),
last_snapshot: AtomicU64::new(0),
backing_storage,
}
}

Expand Down Expand Up @@ -223,7 +231,7 @@ impl TurboTasksBackend {
}

if matches!(consistency, ReadConsistency::Strong) {
todo!("Handle strongly consistent read: {task:#?}");
// todo!("Handle strongly consistent read: {task:#?}");
}

if let Some(output) = get!(task, Output) {
Expand Down Expand Up @@ -286,7 +294,10 @@ impl TurboTasksBackend {
reader_task.add(CachedDataItem::CellDependency { target, value: () });
}
}
return Ok(Ok(CellContent(Some(content)).into_typed(cell.type_id)));
return Ok(Ok(TypedCellContent(
cell.type_id,
CellContent(Some(content.1)),
)));
}

todo!("Cell {cell:?} is not available, recompute task or error: {task:#?}");
Expand All @@ -296,11 +307,86 @@ impl TurboTasksBackend {
if let Some(task_type) = self.task_cache.lookup_reverse(&task_id) {
return Some(task_type);
}
if let Some(task_type) = self.backing_storage.reverse_lookup_task_cache(task_id) {
let _ = self.task_cache.try_insert(task_type.clone(), task_id);
return Some(task_type);
}
None
}

fn snapshot(&self) -> Option<Instant> {
let mut snapshot_request = self.snapshot_request.lock();
snapshot_request.snapshot_requested = true;
let active_operations = self
.in_progress_operations
.fetch_or(SNAPSHOT_REQUESTED_BIT, std::sync::atomic::Ordering::Relaxed);
if active_operations != 0 {
self.operations_suspended
.wait_while(&mut snapshot_request, |_| {
self.in_progress_operations
.load(std::sync::atomic::Ordering::Relaxed)
!= SNAPSHOT_REQUESTED_BIT
});
}
let suspended_operations = snapshot_request
.suspended_operations
.iter()
.map(|op| op.arc().clone())
.collect::<Vec<_>>();
drop(snapshot_request);
let persisted_storage_log = take(&mut *self.persisted_storage_log.lock());
let persisted_task_cache_log = take(&mut *self.persisted_task_cache_log.lock());
let mut snapshot_request = self.snapshot_request.lock();
snapshot_request.snapshot_requested = false;
self.in_progress_operations
.fetch_sub(SNAPSHOT_REQUESTED_BIT, std::sync::atomic::Ordering::Relaxed);
self.snapshot_completed.notify_all();
let snapshot_time = Instant::now();
drop(snapshot_request);

let mut counts: HashMap<TaskId, u32> = HashMap::new();
for CachedDataUpdate { task, .. } in persisted_storage_log.iter() {
*counts.entry(*task).or_default() += 1;
}

if !persisted_task_cache_log.is_empty() || !persisted_storage_log.is_empty() {
if let Err(err) = self.backing_storage.save_snapshot(
suspended_operations,
persisted_task_cache_log,
persisted_storage_log,
) {
println!("Persising failed: {:#?}", err);
return None;
}
println!("Snapshot saved");
}

for (task_id, count) in counts {
self.storage
.access_mut(task_id)
.persistance_state
.finish_persisting_items(count);
}

Some(snapshot_time)
}
}

impl Backend for TurboTasksBackend {
fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
// Continue all uncompleted operations
// They can't be interrupted by a snapshot since the snapshotting job has not been scheduled
// yet.
let uncompleted_operations = self.backing_storage.uncompleted_operations();
let ctx = self.execute_context(turbo_tasks);
for op in uncompleted_operations {
op.execute(&ctx);
}

// Schedule the snapshot job
turbo_tasks.schedule_backend_background_job(BackendJobId::from(1));
}

fn get_or_create_persistent_task(
&self,
task_type: CachedTaskType,
Expand All @@ -312,6 +398,12 @@ impl Backend for TurboTasksBackend {
return task_id;
}

if let Some(task_id) = self.backing_storage.forward_lookup_task_cache(&task_type) {
let _ = self.task_cache.try_insert(Arc::new(task_type), task_id);
self.connect_child(parent_task, task_id, turbo_tasks);
return task_id;
}

let task_type = Arc::new(task_type);
let task_id = self.persisted_task_id_factory.get();
if let Err(existing_task_id) = self.task_cache.try_insert(task_type.clone(), task_id) {
Expand Down Expand Up @@ -669,12 +761,31 @@ impl Backend for TurboTasksBackend {
stale
}

fn run_backend_job(
&self,
_: BackendJobId,
_: &dyn TurboTasksBackendApi<Self>,
) -> Pin<Box<(dyn Future<Output = ()> + Send + 'static)>> {
todo!()
fn run_backend_job<'a>(
&'a self,
id: BackendJobId,
turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
Box::pin(async move {
if *id == 1 {
const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(1);

let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
let last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
let elapsed = last_snapshot.elapsed();
if elapsed < SNAPSHOT_INTERVAL {
tokio::time::sleep(SNAPSHOT_INTERVAL - elapsed).await;
}

if let Some(last_snapshot) = self.snapshot() {
let last_snapshot = last_snapshot.duration_since(self.start_time);
self.last_snapshot
.store(last_snapshot.as_millis() as u64, Ordering::Relaxed);

turbo_tasks.schedule_backend_background_job(id);
}
}
})
}

fn try_read_task_output(
Expand Down Expand Up @@ -724,7 +835,7 @@ impl Backend for TurboTasksBackend {
let ctx = self.execute_context(turbo_tasks);
let task = ctx.task(task_id);
if let Some(content) = get!(task, CellData { cell }) {
Ok(CellContent(Some(content.clone())).into_typed(cell.type_id))
Ok(CellContent(Some(content.1.clone())).into_typed(cell.type_id))
} else {
Ok(CellContent(None).into_typed(cell.type_id))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize};
use turbo_tasks::TaskId;

use super::{ExecuteContext, Operation};
use crate::data::CachedDataItem;
use crate::data::{CachedDataItem, CachedDataItemKey};

#[derive(Serialize, Deserialize, Clone, Default)]
pub enum ConnectChildOperation {
Expand Down Expand Up @@ -37,11 +37,17 @@ impl Operation for ConnectChildOperation {
ctx.operation_suspend_point(&self);
match self {
ConnectChildOperation::ScheduleTask { task_id } => {
let mut should_schedule;
{
let mut task = ctx.task(task_id);
task.add(CachedDataItem::new_scheduled(task_id));
should_schedule = !task.has_key(&CachedDataItemKey::Output {});
if should_schedule {
should_schedule = task.add(CachedDataItem::new_scheduled(task_id));
}
}
if should_schedule {
ctx.schedule(task_id);
}
ctx.schedule(task_id);

self = ConnectChildOperation::Done;
continue;
Expand Down
Loading
Loading