diff --git a/crates/turbo-tasks-fs/src/lib.rs b/crates/turbo-tasks-fs/src/lib.rs index b27426f91dd58b..103bb8884b10f3 100644 --- a/crates/turbo-tasks-fs/src/lib.rs +++ b/crates/turbo-tasks-fs/src/lib.rs @@ -21,10 +21,11 @@ pub mod util; use std::{ borrow::Cow, + cmp::min, collections::{HashMap, HashSet}, fmt::{self, Debug, Display, Formatter}, fs::FileType, - io::{self, ErrorKind}, + io::{self, BufRead, ErrorKind}, mem::take, path::{Path, PathBuf, MAIN_SEPARATOR}, sync::{ @@ -47,7 +48,10 @@ use read_glob::read_glob; pub use read_glob::{ReadGlobResult, ReadGlobResultVc}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use tokio::{fs, io::AsyncReadExt}; +use tokio::{ + fs, + io::{AsyncBufReadExt, AsyncReadExt, BufReader}, +}; use turbo_tasks::{ mark_stateful, primitives::{BoolVc, StringReadRef, StringVc}, @@ -56,7 +60,7 @@ use turbo_tasks::{ CompletionVc, Invalidator, ValueToString, ValueToStringVc, }; use turbo_tasks_hash::hash_xxh3_hash64; -use util::{join_path, normalize_path, sys_to_unix, unix_to_sys}; +use util::{extract_disk_access, join_path, normalize_path, sys_to_unix, unix_to_sys}; use self::{json::UnparseableJson, mutex_map::MutexMap}; use crate::{ @@ -416,18 +420,6 @@ impl Debug for DiskFileSystem { } } -/// Reads the file from disk, without converting the contents into a Vc. -async fn read_file(path: PathBuf, mutex_map: &MutexMap) -> Result { - let _lock = mutex_map.lock(path.clone()).await; - Ok(match retry_future(|| File::from_path(path.clone())).await { - Ok(file) => FileContent::new(file), - Err(e) if e.kind() == ErrorKind::NotFound => FileContent::NotFound, - Err(e) => { - bail!(anyhow!(e).context(format!("reading file {}", path.display()))) - } - }) -} - #[turbo_tasks::value_impl] impl FileSystem for DiskFileSystem { #[turbo_tasks::function] @@ -435,7 +427,14 @@ impl FileSystem for DiskFileSystem { let full_path = self.to_sys_path(fs_path).await?; self.register_invalidator(&full_path)?; - let content = read_file(full_path, &self.mutex_map).await?; + let _lock = self.mutex_map.lock(full_path.clone()).await; + let content = match retry_future(|| File::from_path(full_path.clone())).await { + Ok(file) => FileContent::new(file), + Err(e) if e.kind() == ErrorKind::NotFound => FileContent::NotFound, + Err(e) => { + bail!(anyhow!(e).context(format!("reading file {}", full_path.display()))) + } + }; Ok(content.cell()) } @@ -594,19 +593,19 @@ impl FileSystem for DiskFileSystem { // Track the file, so that we will rewrite it if it ever changes. fs_path.track().await?; - // We perform an untracked read here, so that this write is not dependent on the - // read's FileContent value (and the memory it holds). Our untracked read can be - // freed immediately. Given this is an output file, it's unlikely any Turbo code - // will need to read the file from disk into a FileContentVc, so we're not - // wasting cycles. - let old_content = read_file(full_path.clone(), &self.mutex_map).await?; + let _lock = self.mutex_map.lock(full_path.clone()).await; - if *content == old_content { + // We perform an untracked comparison here, so that this write is not dependent + // on a read's FileContentVc (and the memory it holds). Our untracked read can + // be freed immediately. Given this is an output file, it's unlikely any Turbo + // code will need to read the file from disk into a FileContentVc, so we're not + // wasting cycles. + let compare = content.streaming_compare(full_path.clone()).await?; + if compare == FileComparison::Equal { return Ok(CompletionVc::unchanged()); } - let _lock = self.mutex_map.lock(full_path.clone()).await; - let create_directory = old_content == FileContent::NotFound; + let create_directory = compare == FileComparison::Create; match &*content { FileContent::Content(file) => { if create_directory { @@ -1300,6 +1299,69 @@ impl From for FileContentVc { } } +#[derive(Clone, Debug, Eq, PartialEq)] +enum FileComparison { + Create, + Equal, + NotEqual, +} + +impl FileContent { + /// Performs a comparison of self's data against a disk file's streamed + /// read. + async fn streaming_compare(&self, path: PathBuf) -> Result { + let old_file = extract_disk_access(retry_future(|| fs::File::open(&path)).await, &path)?; + let Some(mut old_file) = old_file else { + return Ok(match self { + FileContent::NotFound => FileComparison::Equal, + _ => FileComparison::Create, + }); + }; + // We know old file exists, does the new file? + let FileContent::Content(new_file) = self else { + return Ok(FileComparison::NotEqual); + }; + + let old_meta = extract_disk_access(retry_future(|| old_file.metadata()).await, &path)?; + let Some(old_meta) = old_meta else { + // If we failed to get meta, then the old file has been deleted between the handle open. + // In which case, we just pretend the file never existed. + return Ok(FileComparison::Create); + }; + // If the meta is different, we need to rewrite the file to update it. + if new_file.meta != old_meta.into() { + return Ok(FileComparison::NotEqual); + } + + // So meta matches, and we have a file handle. Let's stream the contents to see + // if they match. + let mut new_contents = new_file.read(); + let mut old_contents = BufReader::new(&mut old_file); + Ok(loop { + let new_chunk = new_contents.fill_buf()?; + let Ok(old_chunk) = old_contents.fill_buf().await else { + break FileComparison::NotEqual; + }; + + let len = min(new_chunk.len(), old_chunk.len()); + if len == 0 { + if new_chunk.len() == old_chunk.len() { + break FileComparison::Equal; + } else { + break FileComparison::NotEqual; + } + } + + if new_chunk[0..len] != old_chunk[0..len] { + break FileComparison::NotEqual; + } + + new_contents.consume(len); + old_contents.consume(len); + }) + } +} + bitflags! { #[derive(Serialize, Deserialize, TraceRawVcs)] pub struct LinkType: u8 { diff --git a/crates/turbo-tasks-fs/src/rope.rs b/crates/turbo-tasks-fs/src/rope.rs index 88e2565ae82938..d3bd964152ab88 100644 --- a/crates/turbo-tasks-fs/src/rope.rs +++ b/crates/turbo-tasks-fs/src/rope.rs @@ -2,7 +2,7 @@ use std::{ borrow::Cow, cmp::min, fmt, - io::{self, BufRead, Read, Result as IoResult, Write}, + io::{BufRead, Read, Result as IoResult, Write}, mem, ops::{AddAssign, Deref}, pin::Pin, @@ -620,7 +620,7 @@ impl Iterator for RopeReader { } impl Read for RopeReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { + fn read(&mut self, buf: &mut [u8]) -> IoResult { Ok(self.read_internal(buf.len(), &mut ReadBuf::new(buf))) } } @@ -630,7 +630,7 @@ impl AsyncRead for RopeReader { self: Pin<&mut Self>, _cx: &mut TaskContext<'_>, buf: &mut ReadBuf<'_>, - ) -> Poll> { + ) -> Poll> { let this = self.get_mut(); this.read_internal(buf.remaining(), buf); Poll::Ready(Ok(())) diff --git a/crates/turbo-tasks-fs/src/util.rs b/crates/turbo-tasks-fs/src/util.rs index 39f4f2b0d6a96f..b23505e329de27 100644 --- a/crates/turbo-tasks-fs/src/util.rs +++ b/crates/turbo-tasks-fs/src/util.rs @@ -1,4 +1,10 @@ -use std::borrow::Cow; +use std::{ + borrow::Cow, + io::{ErrorKind, Result as IoResult}, + path::Path, +}; + +use anyhow::{anyhow, Result}; /// Joins two /-separated paths into a normalized path. /// Paths are concatenated with /. @@ -110,3 +116,14 @@ pub fn normalize_request(str: &str) -> String { } seqments.join("/") } + +/// Converts a disk access Result into a Result>, where a NotFound +/// error results in a None value. This is purely to reduce boilerplate code +/// comparing NotFound errors against all other errors. +pub fn extract_disk_access(value: IoResult, path: &Path) -> Result> { + match value { + Ok(v) => Ok(Some(v)), + Err(e) if e.kind() == ErrorKind::NotFound => Ok(None), + Err(e) => Err(anyhow!(e).context(format!("reading file {}", path.display()))), + } +}