Skip to content

Commit

Permalink
add lmdb persisting and restoring
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Aug 8, 2024
1 parent 23d0aa2 commit c5b0b3b
Show file tree
Hide file tree
Showing 28 changed files with 574 additions and 103 deletions.
65 changes: 38 additions & 27 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ tokio = "1.25.0"
tokio-util = { version = "0.7.7", features = ["io"] }
tracing = "0.1.37"
tracing-subscriber = "0.3.16"
triomphe = "0.1.13"
triomphe = { git = "https://github.com/sokra/triomphe", branch = "sokra/unstable" }
unicode-segmentation = "1.10.1"
unsize = "1.1.0"
url = "2.2.2"
Expand Down
2 changes: 2 additions & 0 deletions turbopack/crates/turbo-tasks-backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ workspace = true
anyhow = { workspace = true }
async-trait = { workspace = true }
auto-hash-map = { workspace = true }
bincode = "1.3.3"
dashmap = { workspace = true }
indexmap = { workspace = true }
lmdb = "0.8.0"
once_cell = { workspace = true }
parking_lot = { workspace = true }
rand = { workspace = true }
Expand Down
137 changes: 119 additions & 18 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,22 @@ mod storage;

use std::{
borrow::Cow,
collections::HashSet,
collections::{HashMap, HashSet},
future::Future,
hash::BuildHasherDefault,
mem::take,
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
},
time::Duration,
time::{Duration, Instant},
};

use anyhow::Result;
use auto_hash_map::{AutoMap, AutoSet};
use dashmap::DashMap;
pub use operation::AnyOperation;
use parking_lot::{Condvar, Mutex};
use rustc_hash::FxHasher;
use smallvec::smallvec;
Expand All @@ -32,11 +34,9 @@ use turbo_tasks::{
CellId, RawVc, TaskId, TraitTypeId, TurboTasksBackendApi, ValueTypeId, TRANSIENT_TASK_BIT,
};

use self::{
operation::{AnyOperation, ExecuteContext},
storage::Storage,
};
use self::{operation::ExecuteContext, storage::Storage};
use crate::{
backing_storage::BackingStorage,
data::{
CachedDataItem, CachedDataItemKey, CachedDataItemValue, CachedDataUpdate, CellRef,
InProgressState, OutputValue,
Expand All @@ -62,6 +62,8 @@ impl SnapshotRequest {
}

pub struct TurboTasksBackend {
start_time: Instant,

persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
transient_task_id_factory: IdFactoryWithReuse<TaskId>,

Expand All @@ -87,13 +89,18 @@ pub struct TurboTasksBackend {
/// Condition Variable that is triggered when a snapshot is completed and
/// operations can continue.
snapshot_completed: Condvar,
/// The timestamp of the last started snapshot.
last_snapshot: AtomicU64,

backing_storage: Arc<dyn BackingStorage + Sync + Send>,
}

impl TurboTasksBackend {
pub fn new() -> Self {
pub fn new(backing_storage: Arc<dyn BackingStorage + Sync + Send>) -> Self {
Self {
start_time: Instant::now(),
persisted_task_id_factory: IdFactoryWithReuse::new_with_range(
1,
*backing_storage.next_free_task_id() as u64,
(TRANSIENT_TASK_BIT - 1) as u64,
),
transient_task_id_factory: IdFactoryWithReuse::new_with_range(
Expand All @@ -109,6 +116,8 @@ impl TurboTasksBackend {
snapshot_request: Mutex::new(SnapshotRequest::new()),
operations_suspended: Condvar::new(),
snapshot_completed: Condvar::new(),
last_snapshot: AtomicU64::new(0),
backing_storage,
}
}

Expand Down Expand Up @@ -284,14 +293,78 @@ impl TurboTasksBackend {
value: (),
});
}
return Ok(Ok(CellContent(Some(content)).into_typed(cell.type_id)));
return Ok(Ok(TypedCellContent(
cell.type_id,
CellContent(Some(content.1)),
)));
}

todo!("Cell {cell:?} is not available, recompute task or error: {task:#?}");
}

fn snapshot(&self) -> Option<Instant> {
let mut snapshot_request = self.snapshot_request.lock();
snapshot_request.snapshot_requested = true;
let active_operations = self
.in_progress_operations
.fetch_or(SNAPSHOT_REQUESTED_BIT, std::sync::atomic::Ordering::Relaxed);
if active_operations != 0 {
self.operations_suspended
.wait_while(&mut snapshot_request, |_| {
self.in_progress_operations
.load(std::sync::atomic::Ordering::Relaxed)
!= SNAPSHOT_REQUESTED_BIT
});
}
let suspended_operations = snapshot_request
.suspended_operations
.iter()
.map(|op| op.arc().clone())
.collect::<Vec<_>>();
drop(snapshot_request);
let persisted_storage_log = take(&mut *self.persisted_storage_log.lock());
let persisted_task_cache_log = take(&mut *self.persisted_task_cache_log.lock());
let mut snapshot_request = self.snapshot_request.lock();
snapshot_request.snapshot_requested = false;
self.in_progress_operations
.fetch_sub(SNAPSHOT_REQUESTED_BIT, std::sync::atomic::Ordering::Relaxed);
self.snapshot_completed.notify_all();
let snapshot_time = Instant::now();
drop(snapshot_request);

let mut counts: HashMap<TaskId, u32> = HashMap::new();
for CachedDataUpdate { task, .. } in persisted_storage_log.iter() {
*counts.entry(*task).or_default() += 1;
}

if !persisted_task_cache_log.is_empty() || !persisted_storage_log.is_empty() {
if let Err(err) = self.backing_storage.save_snapshot(
suspended_operations,
persisted_task_cache_log,
persisted_storage_log,
) {
println!("Persising failed: {:#?}", err);
return None;
}
println!("Snapshot saved");
}

for (task_id, count) in counts {
self.storage
.access_mut(task_id)
.persistance_state
.finish_persisting_items(count);
}

Some(snapshot_time)
}
}

impl Backend for TurboTasksBackend {
fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
turbo_tasks.schedule_backend_background_job(BackendJobId::from(1));
}

fn get_or_create_persistent_task(
&self,
task_type: CachedTaskType,
Expand All @@ -303,6 +376,12 @@ impl Backend for TurboTasksBackend {
return task_id;
}

if let Some(task_id) = self.backing_storage.forward_lookup_task_cache(&task_type) {
let _ = self.task_cache.try_insert(Arc::new(task_type), task_id);
self.connect_child(parent_task, task_id, turbo_tasks);
return task_id;
}

let task_type = Arc::new(task_type);
let task_id = self.persisted_task_id_factory.get();
if let Err(existing_task_id) = self.task_cache.try_insert(task_type.clone(), task_id) {
Expand Down Expand Up @@ -507,7 +586,7 @@ impl Backend for TurboTasksBackend {
task_id: TaskId,
duration: Duration,

Check warning on line 587 in turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

View workflow job for this annotation

GitHub Actions / test cargo unit / build

unused variable: `duration`
memory_usage: usize,

Check warning on line 588 in turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

View workflow job for this annotation

GitHub Actions / test cargo unit / build

unused variable: `memory_usage`
cell_counters: AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,

Check warning on line 589 in turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

View workflow job for this annotation

GitHub Actions / test cargo unit / build

unused variable: `cell_counters`
stateful: bool,

Check warning on line 590 in turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

View workflow job for this annotation

GitHub Actions / test cargo unit / build

unused variable: `stateful`
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) -> bool {
Expand Down Expand Up @@ -544,12 +623,34 @@ impl Backend for TurboTasksBackend {
stale
}

fn run_backend_job(
&self,
_: BackendJobId,
_: &dyn TurboTasksBackendApi<Self>,
) -> Pin<Box<(dyn Future<Output = ()> + Send + 'static)>> {
todo!()
fn run_backend_job<'a>(
&'a self,
id: BackendJobId,
turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
Box::pin(async move {
match *id {
1 => {
const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(1);

let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
let last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
let elapsed = last_snapshot.elapsed();
if elapsed < SNAPSHOT_INTERVAL {
tokio::time::sleep(SNAPSHOT_INTERVAL - elapsed).await;
}

if let Some(last_snapshot) = self.snapshot() {
let last_snapshot = last_snapshot.duration_since(self.start_time);
self.last_snapshot
.store(last_snapshot.as_millis() as u64, Ordering::Relaxed);

turbo_tasks.schedule_backend_background_job(id);
}
}
_ => {}
}
})
}

fn try_read_task_output(
Expand Down Expand Up @@ -599,7 +700,7 @@ impl Backend for TurboTasksBackend {
let ctx = self.execute_context(turbo_tasks);
let task = ctx.task(task_id);
if let Some(content) = get!(task, CellData { cell }) {
Ok(CellContent(Some(content.clone())).into_typed(cell.type_id))
Ok(CellContent(Some(content.1.clone())).into_typed(cell.type_id))
} else {
Ok(CellContent(None).into_typed(cell.type_id))
}
Expand Down
Loading

0 comments on commit c5b0b3b

Please sign in to comment.