From 7c24763befc58b6803cdb279d7d96429d0b1efcf Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Mon, 2 Dec 2024 16:02:49 +0100 Subject: [PATCH] [Turbopack] Persistent Caching fixups (#73423) ### What? * sync blob files on commit * This could cause a database corruption * add print_stats feature to print stats on shutdown * use memory map to read blob files * shared index and key cache --- turbopack/crates/turbo-persistence/Cargo.toml | 1 + .../crates/turbo-persistence/src/constants.rs | 6 +- turbopack/crates/turbo-persistence/src/db.rs | 52 +++++++++-------- .../src/static_sorted_file.rs | 5 +- .../turbo-persistence/src/write_batch.rs | 56 +++++++++++++++---- 5 files changed, 76 insertions(+), 44 deletions(-) diff --git a/turbopack/crates/turbo-persistence/Cargo.toml b/turbopack/crates/turbo-persistence/Cargo.toml index 8783c62d456fb..6e4b023f9cbd2 100644 --- a/turbopack/crates/turbo-persistence/Cargo.toml +++ b/turbopack/crates/turbo-persistence/Cargo.toml @@ -8,6 +8,7 @@ license = "MIT" verify_sst_content = [] strict_checks = [] stats = ["quick_cache/stats"] +print_stats = ["stats"] [dependencies] anyhow = { workspace = true } diff --git a/turbopack/crates/turbo-persistence/src/constants.rs b/turbopack/crates/turbo-persistence/src/constants.rs index af103a4bc95c1..de67de1b8e084 100644 --- a/turbopack/crates/turbo-persistence/src/constants.rs +++ b/turbopack/crates/turbo-persistence/src/constants.rs @@ -21,12 +21,8 @@ pub const DATA_THRESHOLD_PER_COMPACTED_FILE: usize = 256 * 1024 * 1024; pub const AQMF_CACHE_SIZE: u64 = 300 * 1024 * 1024; pub const AQMF_AVG_SIZE: usize = 37399; -/// Maximum RAM bytes for index block cache -pub const INDEX_BLOCK_CACHE_SIZE: u64 = 100 * 1024 * 1024; -pub const INDEX_BLOCK_AVG_SIZE: usize = 152000; - /// Maximum RAM bytes for key block cache -pub const KEY_BLOCK_CACHE_SIZE: u64 = 300 * 1024 * 1024; +pub const KEY_BLOCK_CACHE_SIZE: u64 = 400 * 1024 * 1024; pub const KEY_BLOCK_AVG_SIZE: usize = 16 * 1024; /// Maximum RAM bytes for value block cache diff --git a/turbopack/crates/turbo-persistence/src/db.rs b/turbopack/crates/turbo-persistence/src/db.rs index 2366e5c7f2b65..ba3a0b1cbc972 100644 --- a/turbopack/crates/turbo-persistence/src/db.rs +++ b/turbopack/crates/turbo-persistence/src/db.rs @@ -14,6 +14,7 @@ use std::{ use anyhow::{bail, Context, Result}; use byteorder::{ReadBytesExt, WriteBytesExt, BE}; use lzzzz::lz4::decompress; +use memmap2::{Advice, Mmap}; use parking_lot::{Mutex, RwLock}; use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; @@ -23,9 +24,9 @@ use crate::{ get_compaction_jobs, total_coverage, CompactConfig, Compactable, CompactionJobs, }, constants::{ - AQMF_AVG_SIZE, AQMF_CACHE_SIZE, DATA_THRESHOLD_PER_COMPACTED_FILE, INDEX_BLOCK_AVG_SIZE, - INDEX_BLOCK_CACHE_SIZE, KEY_BLOCK_AVG_SIZE, KEY_BLOCK_CACHE_SIZE, - MAX_ENTRIES_PER_COMPACTED_FILE, VALUE_BLOCK_AVG_SIZE, VALUE_BLOCK_CACHE_SIZE, + AQMF_AVG_SIZE, AQMF_CACHE_SIZE, DATA_THRESHOLD_PER_COMPACTED_FILE, KEY_BLOCK_AVG_SIZE, + KEY_BLOCK_CACHE_SIZE, MAX_ENTRIES_PER_COMPACTED_FILE, VALUE_BLOCK_AVG_SIZE, + VALUE_BLOCK_CACHE_SIZE, }, key::{hash_key, StoreKey}, lookup_entry::LookupEntry, @@ -34,7 +35,7 @@ use crate::{ AqmfCache, BlockCache, LookupResult, StaticSortedFile, StaticSortedFileRange, }, static_sorted_file_builder::StaticSortedFileBuilder, - write_batch::WriteBatch, + write_batch::{FinishResult, WriteBatch}, QueryKey, }; @@ -77,7 +78,6 @@ impl CacheStatistics { #[derive(Debug)] pub struct Statistics { pub sst_files: usize, - pub index_block_cache: CacheStatistics, pub key_block_cache: CacheStatistics, pub value_block_cache: CacheStatistics, pub aqmf_cache: CacheStatistics, @@ -115,8 +115,6 @@ pub struct TurboPersistence { active_write_operation: AtomicBool, /// A cache for deserialized AQMF filters. aqmf_cache: AqmfCache, - /// A cache for decompressed index blocks. - index_block_cache: BlockCache, /// A cache for decompressed key blocks. key_block_cache: BlockCache, /// A cache for decompressed value blocks. @@ -155,13 +153,6 @@ impl TurboPersistence { Default::default(), Default::default(), ), - index_block_cache: BlockCache::with( - INDEX_BLOCK_CACHE_SIZE as usize / INDEX_BLOCK_AVG_SIZE, - INDEX_BLOCK_CACHE_SIZE, - Default::default(), - Default::default(), - Default::default(), - ), key_block_cache: BlockCache::with( KEY_BLOCK_CACHE_SIZE as usize / KEY_BLOCK_AVG_SIZE, KEY_BLOCK_CACHE_SIZE, @@ -338,9 +329,16 @@ impl TurboPersistence { /// Reads and decompresses a blob file. This is not backed by any cache. fn read_blob(&self, seq: u32) -> Result> { let path = self.path.join(format!("{:08}.blob", seq)); - let compressed = - fs::read(path).with_context(|| format!("Unable to read blob file {:08}.blob", seq))?; - let mut compressed = &compressed[..]; + let mmap = unsafe { Mmap::map(&File::open(&path)?)? }; + #[cfg(unix)] + mmap.advise(Advice::Sequential)?; + #[cfg(unix)] + mmap.advise(Advice::WillNeed)?; + #[cfg(target_os = "linux")] + mmap.advise(Advice::DontFork)?; + #[cfg(target_os = "linux")] + mmap.advise(Advice::Unmergeable)?; + let mut compressed = &mmap[..]; let uncompressed_length = compressed.read_u32::()? as usize; let buffer = Arc::new_zeroed_slice(uncompressed_length); @@ -391,8 +389,12 @@ impl TurboPersistence { &self, mut write_batch: WriteBatch, ) -> Result<()> { - let (seq, new_sst_files) = write_batch.finish()?; - self.commit(new_sst_files, vec![], seq)?; + let FinishResult { + sequence_number, + new_sst_files, + new_blob_files, + } = write_batch.finish()?; + self.commit(new_sst_files, new_blob_files, vec![], sequence_number)?; self.active_write_operation.store(false, Ordering::Release); self.idle_write_batch.lock().replace(( TypeId::of::>(), @@ -406,6 +408,7 @@ impl TurboPersistence { fn commit( &self, new_sst_files: Vec<(u32, File)>, + new_blob_files: Vec, mut indicies_to_delete: Vec, mut seq: u32, ) -> Result<(), anyhow::Error> { @@ -417,6 +420,10 @@ impl TurboPersistence { }) .collect::>>()?; + for file in new_blob_files { + file.sync_all()?; + } + if !indicies_to_delete.is_empty() { seq += 1; } @@ -505,6 +512,7 @@ impl TurboPersistence { self.commit( new_sst_files, + Vec::new(), indicies_to_delete, *sequence_number.get_mut(), )?; @@ -780,7 +788,6 @@ impl TurboPersistence { hash, key, &self.aqmf_cache, - &self.index_block_cache, &self.key_block_cache, &self.value_block_cache, )? { @@ -825,7 +832,6 @@ impl TurboPersistence { let inner = self.inner.read(); Statistics { sst_files: inner.static_sorted_files.len(), - index_block_cache: CacheStatistics::new(&self.index_block_cache), key_block_cache: CacheStatistics::new(&self.key_block_cache), value_block_cache: CacheStatistics::new(&self.value_block_cache), aqmf_cache: CacheStatistics::new(&self.aqmf_cache), @@ -839,9 +845,9 @@ impl TurboPersistence { } } - /// Shuts down the database. This will print statistics if the `stats` feature is enabled. + /// Shuts down the database. This will print statistics if the `print_stats` feature is enabled. pub fn shutdown(&self) -> Result<()> { - #[cfg(feature = "stats")] + #[cfg(feature = "print_stats")] println!("{:#?}", self.statistics()); Ok(()) } diff --git a/turbopack/crates/turbo-persistence/src/static_sorted_file.rs b/turbopack/crates/turbo-persistence/src/static_sorted_file.rs index 1de537c69ee3d..1d31a292c0091 100644 --- a/turbopack/crates/turbo-persistence/src/static_sorted_file.rs +++ b/turbopack/crates/turbo-persistence/src/static_sorted_file.rs @@ -236,7 +236,6 @@ impl StaticSortedFile { key_hash: u64, key: &K, aqmf_cache: &AqmfCache, - index_block_cache: &BlockCache, key_block_cache: &BlockCache, value_block_cache: &BlockCache, ) -> Result { @@ -270,10 +269,8 @@ impl StaticSortedFile { } } let mut current_block = header.block_count - 1; - let mut cache = index_block_cache; loop { - let block = self.get_key_block(header, current_block, cache)?; - cache = key_block_cache; + let block = self.get_key_block(header, current_block, key_block_cache)?; let mut block = &block[..]; let block_type = block.read_u8()?; match block_type { diff --git a/turbopack/crates/turbo-persistence/src/write_batch.rs b/turbopack/crates/turbo-persistence/src/write_batch.rs index 97490c73e41e5..19bb575ea0528 100644 --- a/turbopack/crates/turbo-persistence/src/write_batch.rs +++ b/turbopack/crates/turbo-persistence/src/write_batch.rs @@ -2,6 +2,7 @@ use std::{ borrow::Cow, cell::UnsafeCell, fs::File, + io::Write, mem::{replace, swap}, path::PathBuf, sync::atomic::{AtomicU32, Ordering}, @@ -28,6 +29,15 @@ struct ThreadLocalState { collectors: [Option>; FAMILIES], /// The list of new SST files that have been created. new_sst_files: Vec<(u32, File)>, + /// The list of new blob files that have been created. + new_blob_files: Vec, +} + +/// The result of a `WriteBatch::finish` operation. +pub(crate) struct FinishResult { + pub(crate) sequence_number: u32, + pub(crate) new_sst_files: Vec<(u32, File)>, + pub(crate) new_blob_files: Vec, } /// A write batch. @@ -61,17 +71,27 @@ impl WriteBatch { .store(current, Ordering::SeqCst); } - /// Returns the collector for a family for the current thread. - fn collector_mut(&self, family: usize) -> Result<&mut Collector> { - debug_assert!(family < FAMILIES); + /// Returns the thread local state for the current thread. + #[allow(clippy::mut_from_ref)] + fn thread_local_state(&self) -> &mut ThreadLocalState { let cell = self.thread_locals.get_or(|| { UnsafeCell::new(ThreadLocalState { collectors: [const { None }; FAMILIES], new_sst_files: Vec::new(), + new_blob_files: Vec::new(), }) }); // Safety: We know that the cell is only accessed from the current thread. - let state = unsafe { &mut *cell.get() }; + unsafe { &mut *cell.get() } + } + + /// Returns the collector for a family for the current thread. + fn collector_mut<'l>( + &self, + state: &'l mut ThreadLocalState, + family: usize, + ) -> Result<&'l mut Collector> { + debug_assert!(family < FAMILIES); let collector = state.collectors[family].get_or_insert_with(|| { self.idle_collectors .lock() @@ -88,31 +108,36 @@ impl WriteBatch { /// Puts a key-value pair into the write batch. pub fn put(&self, family: usize, key: K, value: Cow<'_, [u8]>) -> Result<()> { - let collector = self.collector_mut(family)?; + let state = self.thread_local_state(); + let collector = self.collector_mut(state, family)?; if value.len() <= MAX_MEDIUM_VALUE_SIZE { collector.put(key, value.into_owned()); } else { - let blob = self.create_blob(&value)?; + let (blob, file) = self.create_blob(&value)?; collector.put_blob(key, blob); + state.new_blob_files.push(file); } Ok(()) } /// Puts a delete operation into the write batch. pub fn delete(&self, family: usize, key: K) -> Result<()> { - let collector = self.collector_mut(family)?; + let state = self.thread_local_state(); + let collector = self.collector_mut(state, family)?; collector.delete(key); Ok(()) } /// Finishes the write batch by returning the new sequence number and the new SST files. This /// writes all outstanding thread local data to disk. - pub(crate) fn finish(&mut self) -> Result<(u32, Vec<(u32, File)>)> { + pub(crate) fn finish(&mut self) -> Result { let mut new_sst_files = Vec::new(); + let mut new_blob_files = Vec::new(); let mut all_collectors = [(); FAMILIES].map(|_| Vec::new()); for cell in self.thread_locals.iter_mut() { let state = cell.get_mut(); new_sst_files.append(&mut state.new_sst_files); + new_blob_files.append(&mut state.new_blob_files); for (family, global_collector) in all_collectors.iter_mut().enumerate() { if let Some(collector) = state.collectors[family].take() { if !collector.is_empty() { @@ -200,11 +225,15 @@ impl WriteBatch { shared_error.into_inner()?; let seq = self.current_sequence_number.load(Ordering::SeqCst); new_sst_files.sort_by_key(|(seq, _)| *seq); - Ok((seq, new_sst_files)) + Ok(FinishResult { + sequence_number: seq, + new_sst_files, + new_blob_files, + }) } /// Creates a new blob file with the given value. - fn create_blob(&self, value: &[u8]) -> Result { + fn create_blob(&self, value: &[u8]) -> Result<(u32, File)> { let seq = self.current_sequence_number.fetch_add(1, Ordering::SeqCst) + 1; let mut buffer = Vec::new(); buffer.write_u32::(value.len() as u32)?; @@ -212,8 +241,11 @@ impl WriteBatch { .context("Compression of value for blob file failed")?; let file = self.path.join(format!("{:08}.blob", seq)); - std::fs::write(file, &buffer).context("Unable to write blob file")?; - Ok(seq) + let mut file = File::create(&file).context("Unable to create blob file")?; + file.write_all(&buffer) + .context("Unable to write blob file")?; + file.flush().context("Unable to flush blob file")?; + Ok((seq, file)) } /// Creates a new SST file with the given collector data.