Skip to content

Commit

Permalink
[Turbopack] Persistent Caching fixups (#73423)
Browse files Browse the repository at this point in the history
### What?

* sync blob files on commit
  * This could cause a database corruption
* add print_stats feature to print stats on shutdown
* use memory map to read blob files
* shared index and key cache
  • Loading branch information
sokra authored Dec 2, 2024
1 parent c95ed85 commit 7c24763
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 44 deletions.
1 change: 1 addition & 0 deletions turbopack/crates/turbo-persistence/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ license = "MIT"
verify_sst_content = []
strict_checks = []
stats = ["quick_cache/stats"]
print_stats = ["stats"]

[dependencies]
anyhow = { workspace = true }
Expand Down
6 changes: 1 addition & 5 deletions turbopack/crates/turbo-persistence/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,8 @@ pub const DATA_THRESHOLD_PER_COMPACTED_FILE: usize = 256 * 1024 * 1024;
pub const AQMF_CACHE_SIZE: u64 = 300 * 1024 * 1024;
pub const AQMF_AVG_SIZE: usize = 37399;

/// Maximum RAM bytes for index block cache
pub const INDEX_BLOCK_CACHE_SIZE: u64 = 100 * 1024 * 1024;
pub const INDEX_BLOCK_AVG_SIZE: usize = 152000;

/// Maximum RAM bytes for key block cache
pub const KEY_BLOCK_CACHE_SIZE: u64 = 300 * 1024 * 1024;
pub const KEY_BLOCK_CACHE_SIZE: u64 = 400 * 1024 * 1024;
pub const KEY_BLOCK_AVG_SIZE: usize = 16 * 1024;

/// Maximum RAM bytes for value block cache
Expand Down
52 changes: 29 additions & 23 deletions turbopack/crates/turbo-persistence/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::{
use anyhow::{bail, Context, Result};
use byteorder::{ReadBytesExt, WriteBytesExt, BE};
use lzzzz::lz4::decompress;
use memmap2::{Advice, Mmap};

Check failure on line 17 in turbopack/crates/turbo-persistence/src/db.rs

View workflow job for this annotation

GitHub Actions / stable - aarch64-pc-windows-msvc - node@16

unresolved import `memmap2::Advice`

Check failure on line 17 in turbopack/crates/turbo-persistence/src/db.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-pc-windows-msvc - node@16

unresolved import `memmap2::Advice`
use parking_lot::{Mutex, RwLock};
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};

Expand All @@ -23,9 +24,9 @@ use crate::{
get_compaction_jobs, total_coverage, CompactConfig, Compactable, CompactionJobs,
},
constants::{
AQMF_AVG_SIZE, AQMF_CACHE_SIZE, DATA_THRESHOLD_PER_COMPACTED_FILE, INDEX_BLOCK_AVG_SIZE,
INDEX_BLOCK_CACHE_SIZE, KEY_BLOCK_AVG_SIZE, KEY_BLOCK_CACHE_SIZE,
MAX_ENTRIES_PER_COMPACTED_FILE, VALUE_BLOCK_AVG_SIZE, VALUE_BLOCK_CACHE_SIZE,
AQMF_AVG_SIZE, AQMF_CACHE_SIZE, DATA_THRESHOLD_PER_COMPACTED_FILE, KEY_BLOCK_AVG_SIZE,
KEY_BLOCK_CACHE_SIZE, MAX_ENTRIES_PER_COMPACTED_FILE, VALUE_BLOCK_AVG_SIZE,
VALUE_BLOCK_CACHE_SIZE,
},
key::{hash_key, StoreKey},
lookup_entry::LookupEntry,
Expand All @@ -34,7 +35,7 @@ use crate::{
AqmfCache, BlockCache, LookupResult, StaticSortedFile, StaticSortedFileRange,
},
static_sorted_file_builder::StaticSortedFileBuilder,
write_batch::WriteBatch,
write_batch::{FinishResult, WriteBatch},
QueryKey,
};

Expand Down Expand Up @@ -77,7 +78,6 @@ impl CacheStatistics {
#[derive(Debug)]
pub struct Statistics {
pub sst_files: usize,
pub index_block_cache: CacheStatistics,
pub key_block_cache: CacheStatistics,
pub value_block_cache: CacheStatistics,
pub aqmf_cache: CacheStatistics,
Expand Down Expand Up @@ -115,8 +115,6 @@ pub struct TurboPersistence {
active_write_operation: AtomicBool,
/// A cache for deserialized AQMF filters.
aqmf_cache: AqmfCache,
/// A cache for decompressed index blocks.
index_block_cache: BlockCache,
/// A cache for decompressed key blocks.
key_block_cache: BlockCache,
/// A cache for decompressed value blocks.
Expand Down Expand Up @@ -155,13 +153,6 @@ impl TurboPersistence {
Default::default(),
Default::default(),
),
index_block_cache: BlockCache::with(
INDEX_BLOCK_CACHE_SIZE as usize / INDEX_BLOCK_AVG_SIZE,
INDEX_BLOCK_CACHE_SIZE,
Default::default(),
Default::default(),
Default::default(),
),
key_block_cache: BlockCache::with(
KEY_BLOCK_CACHE_SIZE as usize / KEY_BLOCK_AVG_SIZE,
KEY_BLOCK_CACHE_SIZE,
Expand Down Expand Up @@ -338,9 +329,16 @@ impl TurboPersistence {
/// Reads and decompresses a blob file. This is not backed by any cache.
fn read_blob(&self, seq: u32) -> Result<ArcSlice<u8>> {
let path = self.path.join(format!("{:08}.blob", seq));
let compressed =
fs::read(path).with_context(|| format!("Unable to read blob file {:08}.blob", seq))?;
let mut compressed = &compressed[..];
let mmap = unsafe { Mmap::map(&File::open(&path)?)? };
#[cfg(unix)]
mmap.advise(Advice::Sequential)?;
#[cfg(unix)]
mmap.advise(Advice::WillNeed)?;
#[cfg(target_os = "linux")]
mmap.advise(Advice::DontFork)?;
#[cfg(target_os = "linux")]
mmap.advise(Advice::Unmergeable)?;
let mut compressed = &mmap[..];
let uncompressed_length = compressed.read_u32::<BE>()? as usize;

let buffer = Arc::new_zeroed_slice(uncompressed_length);
Expand Down Expand Up @@ -391,8 +389,12 @@ impl TurboPersistence {
&self,
mut write_batch: WriteBatch<K, FAMILIES>,
) -> Result<()> {
let (seq, new_sst_files) = write_batch.finish()?;
self.commit(new_sst_files, vec![], seq)?;
let FinishResult {
sequence_number,
new_sst_files,
new_blob_files,
} = write_batch.finish()?;
self.commit(new_sst_files, new_blob_files, vec![], sequence_number)?;
self.active_write_operation.store(false, Ordering::Release);
self.idle_write_batch.lock().replace((
TypeId::of::<WriteBatch<K, FAMILIES>>(),
Expand All @@ -406,6 +408,7 @@ impl TurboPersistence {
fn commit(
&self,
new_sst_files: Vec<(u32, File)>,
new_blob_files: Vec<File>,
mut indicies_to_delete: Vec<usize>,
mut seq: u32,
) -> Result<(), anyhow::Error> {
Expand All @@ -417,6 +420,10 @@ impl TurboPersistence {
})
.collect::<Result<Vec<_>>>()?;

for file in new_blob_files {
file.sync_all()?;
}

if !indicies_to_delete.is_empty() {
seq += 1;
}
Expand Down Expand Up @@ -505,6 +512,7 @@ impl TurboPersistence {

self.commit(
new_sst_files,
Vec::new(),
indicies_to_delete,
*sequence_number.get_mut(),
)?;
Expand Down Expand Up @@ -780,7 +788,6 @@ impl TurboPersistence {
hash,
key,
&self.aqmf_cache,
&self.index_block_cache,
&self.key_block_cache,
&self.value_block_cache,
)? {
Expand Down Expand Up @@ -825,7 +832,6 @@ impl TurboPersistence {
let inner = self.inner.read();
Statistics {
sst_files: inner.static_sorted_files.len(),
index_block_cache: CacheStatistics::new(&self.index_block_cache),
key_block_cache: CacheStatistics::new(&self.key_block_cache),
value_block_cache: CacheStatistics::new(&self.value_block_cache),
aqmf_cache: CacheStatistics::new(&self.aqmf_cache),
Expand All @@ -839,9 +845,9 @@ impl TurboPersistence {
}
}

/// Shuts down the database. This will print statistics if the `stats` feature is enabled.
/// Shuts down the database. This will print statistics if the `print_stats` feature is enabled.
pub fn shutdown(&self) -> Result<()> {
#[cfg(feature = "stats")]
#[cfg(feature = "print_stats")]
println!("{:#?}", self.statistics());
Ok(())
}
Expand Down
5 changes: 1 addition & 4 deletions turbopack/crates/turbo-persistence/src/static_sorted_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ impl StaticSortedFile {
key_hash: u64,
key: &K,
aqmf_cache: &AqmfCache,
index_block_cache: &BlockCache,
key_block_cache: &BlockCache,
value_block_cache: &BlockCache,
) -> Result<LookupResult> {
Expand Down Expand Up @@ -270,10 +269,8 @@ impl StaticSortedFile {
}
}
let mut current_block = header.block_count - 1;
let mut cache = index_block_cache;
loop {
let block = self.get_key_block(header, current_block, cache)?;
cache = key_block_cache;
let block = self.get_key_block(header, current_block, key_block_cache)?;
let mut block = &block[..];
let block_type = block.read_u8()?;
match block_type {
Expand Down
56 changes: 44 additions & 12 deletions turbopack/crates/turbo-persistence/src/write_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{
borrow::Cow,
cell::UnsafeCell,
fs::File,
io::Write,
mem::{replace, swap},
path::PathBuf,
sync::atomic::{AtomicU32, Ordering},
Expand All @@ -28,6 +29,15 @@ struct ThreadLocalState<K: StoreKey + Send, const FAMILIES: usize> {
collectors: [Option<Collector<K>>; FAMILIES],
/// The list of new SST files that have been created.
new_sst_files: Vec<(u32, File)>,
/// The list of new blob files that have been created.
new_blob_files: Vec<File>,
}

/// The result of a `WriteBatch::finish` operation.
pub(crate) struct FinishResult {
pub(crate) sequence_number: u32,
pub(crate) new_sst_files: Vec<(u32, File)>,
pub(crate) new_blob_files: Vec<File>,
}

/// A write batch.
Expand Down Expand Up @@ -61,17 +71,27 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
.store(current, Ordering::SeqCst);
}

/// Returns the collector for a family for the current thread.
fn collector_mut(&self, family: usize) -> Result<&mut Collector<K>> {
debug_assert!(family < FAMILIES);
/// Returns the thread local state for the current thread.
#[allow(clippy::mut_from_ref)]
fn thread_local_state(&self) -> &mut ThreadLocalState<K, FAMILIES> {
let cell = self.thread_locals.get_or(|| {
UnsafeCell::new(ThreadLocalState {
collectors: [const { None }; FAMILIES],
new_sst_files: Vec::new(),
new_blob_files: Vec::new(),
})
});
// Safety: We know that the cell is only accessed from the current thread.
let state = unsafe { &mut *cell.get() };
unsafe { &mut *cell.get() }
}

/// Returns the collector for a family for the current thread.
fn collector_mut<'l>(
&self,
state: &'l mut ThreadLocalState<K, FAMILIES>,
family: usize,
) -> Result<&'l mut Collector<K>> {
debug_assert!(family < FAMILIES);
let collector = state.collectors[family].get_or_insert_with(|| {
self.idle_collectors
.lock()
Expand All @@ -88,31 +108,36 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {

/// Puts a key-value pair into the write batch.
pub fn put(&self, family: usize, key: K, value: Cow<'_, [u8]>) -> Result<()> {
let collector = self.collector_mut(family)?;
let state = self.thread_local_state();
let collector = self.collector_mut(state, family)?;
if value.len() <= MAX_MEDIUM_VALUE_SIZE {
collector.put(key, value.into_owned());
} else {
let blob = self.create_blob(&value)?;
let (blob, file) = self.create_blob(&value)?;
collector.put_blob(key, blob);
state.new_blob_files.push(file);
}
Ok(())
}

/// Puts a delete operation into the write batch.
pub fn delete(&self, family: usize, key: K) -> Result<()> {
let collector = self.collector_mut(family)?;
let state = self.thread_local_state();
let collector = self.collector_mut(state, family)?;
collector.delete(key);
Ok(())
}

/// Finishes the write batch by returning the new sequence number and the new SST files. This
/// writes all outstanding thread local data to disk.
pub(crate) fn finish(&mut self) -> Result<(u32, Vec<(u32, File)>)> {
pub(crate) fn finish(&mut self) -> Result<FinishResult> {
let mut new_sst_files = Vec::new();
let mut new_blob_files = Vec::new();
let mut all_collectors = [(); FAMILIES].map(|_| Vec::new());
for cell in self.thread_locals.iter_mut() {
let state = cell.get_mut();
new_sst_files.append(&mut state.new_sst_files);
new_blob_files.append(&mut state.new_blob_files);
for (family, global_collector) in all_collectors.iter_mut().enumerate() {
if let Some(collector) = state.collectors[family].take() {
if !collector.is_empty() {
Expand Down Expand Up @@ -200,20 +225,27 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
shared_error.into_inner()?;
let seq = self.current_sequence_number.load(Ordering::SeqCst);
new_sst_files.sort_by_key(|(seq, _)| *seq);
Ok((seq, new_sst_files))
Ok(FinishResult {
sequence_number: seq,
new_sst_files,
new_blob_files,
})
}

/// Creates a new blob file with the given value.
fn create_blob(&self, value: &[u8]) -> Result<u32> {
fn create_blob(&self, value: &[u8]) -> Result<(u32, File)> {
let seq = self.current_sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
let mut buffer = Vec::new();
buffer.write_u32::<BE>(value.len() as u32)?;
lz4::compress_to_vec(value, &mut buffer, ACC_LEVEL_DEFAULT)
.context("Compression of value for blob file failed")?;

let file = self.path.join(format!("{:08}.blob", seq));
std::fs::write(file, &buffer).context("Unable to write blob file")?;
Ok(seq)
let mut file = File::create(&file).context("Unable to create blob file")?;
file.write_all(&buffer)
.context("Unable to write blob file")?;
file.flush().context("Unable to flush blob file")?;
Ok((seq, file))
}

/// Creates a new SST file with the given collector data.
Expand Down

0 comments on commit 7c24763

Please sign in to comment.