diff --git a/crates/turbo-tasks-fs/src/lib.rs b/crates/turbo-tasks-fs/src/lib.rs index a5e5b5517efaee..75cdc3f4e062fe 100644 --- a/crates/turbo-tasks-fs/src/lib.rs +++ b/crates/turbo-tasks-fs/src/lib.rs @@ -6,6 +6,7 @@ #![feature(main_separator_str)] #![feature(box_syntax)] #![feature(round_char_boundary)] +#![feature(new_uninit)] pub mod attach; pub mod embed; @@ -21,10 +22,11 @@ pub mod util; use std::{ borrow::Cow, + cmp::min, collections::{HashMap, HashSet}, fmt::{self, Debug, Display, Formatter}, fs::FileType, - io::{self, ErrorKind}, + io::{self, BufRead, ErrorKind}, mem::take, path::{Path, PathBuf, MAIN_SEPARATOR}, sync::{ @@ -47,7 +49,10 @@ use read_glob::read_glob; pub use read_glob::{ReadGlobResult, ReadGlobResultVc}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use tokio::{fs, io::AsyncReadExt}; +use tokio::{ + fs, + io::{AsyncBufReadExt, AsyncReadExt}, +}; use turbo_tasks::{ mark_stateful, primitives::{BoolVc, StringReadRef, StringVc}, @@ -56,7 +61,9 @@ use turbo_tasks::{ CompletionVc, Invalidator, ValueToString, ValueToStringVc, }; use turbo_tasks_hash::hash_xxh3_hash64; -use util::{join_path, normalize_path, sys_to_unix, unix_to_sys}; +use util::{ + extract_disk_access, join_path, normalize_path, sys_to_unix, unix_to_sys, AsyncBufReader, +}; use self::{json::UnparseableJson, mutex_map::MutexMap}; use crate::{ @@ -312,18 +319,6 @@ impl Debug for DiskFileSystem { } } -/// Reads the file from disk, without converting the contents into a Vc. -async fn read_file(path: PathBuf, mutex_map: &MutexMap) -> Result { - let _lock = mutex_map.lock(path.clone()).await; - Ok(match retry_future(|| File::from_path(path.clone())).await { - Ok(file) => FileContent::new(file), - Err(e) if e.kind() == ErrorKind::NotFound => FileContent::NotFound, - Err(e) => { - bail!(anyhow!(e).context(format!("reading file {}", path.display()))) - } - }) -} - #[turbo_tasks::value_impl] impl FileSystem for DiskFileSystem { #[turbo_tasks::function] @@ -331,7 +326,14 @@ impl FileSystem for DiskFileSystem { let full_path = self.to_sys_path(fs_path).await?; self.register_invalidator(&full_path, true); - let content = read_file(full_path, &self.mutex_map).await?; + let _lock = self.mutex_map.lock(full_path.clone()).await; + let content = match retry_future(|| File::from_path(full_path.clone())).await { + Ok(file) => FileContent::new(file), + Err(e) if e.kind() == ErrorKind::NotFound => FileContent::NotFound, + Err(e) => { + bail!(anyhow!(e).context(format!("reading file {}", full_path.display()))) + } + }; Ok(content.cell()) } @@ -490,19 +492,19 @@ impl FileSystem for DiskFileSystem { // Track the file, so that we will rewrite it if it ever changes. fs_path.track().await?; - // We perform an untracked read here, so that this write is not dependent on the - // read's FileContent value (and the memory it holds). Our untracked read can be - // freed immediately. Given this is an output file, it's unlikely any Turbo code - // will need to read the file from disk into a FileContentVc, so we're not - // wasting cycles. - let old_content = read_file(full_path.clone(), &self.mutex_map).await?; + let _lock = self.mutex_map.lock(full_path.clone()).await; - if *content == old_content { + // We perform an untracked comparison here, so that this write is not dependent + // on a read's FileContentVc (and the memory it holds). Our untracked read can + // be freed immediately. Given this is an output file, it's unlikely any Turbo + // code will need to read the file from disk into a FileContentVc, so we're not + // wasting cycles. + let compare = content.streaming_compare(full_path.clone()).await?; + if compare == FileComparison::Equal { return Ok(CompletionVc::unchanged()); } - let _lock = self.mutex_map.lock(full_path.clone()).await; - let create_directory = old_content == FileContent::NotFound; + let create_directory = compare == FileComparison::Create; match &*content { FileContent::Content(file) => { if create_directory { @@ -1196,6 +1198,69 @@ impl From for FileContentVc { } } +#[derive(Clone, Debug, Eq, PartialEq)] +enum FileComparison { + Create, + Equal, + NotEqual, +} + +impl FileContent { + /// Performs a comparison of self's data against a disk file's streamed + /// read. + async fn streaming_compare(&self, path: PathBuf) -> Result { + let old_file = extract_disk_access(retry_future(|| fs::File::open(&path)).await, &path)?; + let Some(mut old_file) = old_file else { + return Ok(match self { + FileContent::NotFound => FileComparison::Equal, + _ => FileComparison::Create, + }); + }; + // We know old file exists, does the new file? + let FileContent::Content(new_file) = self else { + return Ok(FileComparison::NotEqual); + }; + + let old_meta = extract_disk_access(retry_future(|| old_file.metadata()).await, &path)?; + let Some(old_meta) = old_meta else { + // If we failed to get meta, then the old file has been deleted between the handle open. + // In which case, we just pretend the file never existed. + return Ok(FileComparison::Create); + }; + // If the meta is different, we need to rewrite the file to update it. + if new_file.meta != old_meta.into() { + return Ok(FileComparison::NotEqual); + } + + // So meta matches, and we have a file handle. Let's stream the contents to see + // if they match. + let mut new_contents = new_file.read(); + let mut old_contents = AsyncBufReader::new(&mut old_file); + Ok(loop { + let new_chunk = new_contents.fill_buf()?; + let Ok(old_chunk) = old_contents.fill_buf().await else { + break FileComparison::NotEqual; + }; + + let len = min(new_chunk.len(), old_chunk.len()); + if len == 0 { + if new_chunk.len() == old_chunk.len() { + break FileComparison::Equal; + } else { + break FileComparison::NotEqual; + } + } + + if new_chunk[0..len] != old_chunk[0..len] { + break FileComparison::NotEqual; + } + + new_contents.consume(len); + old_contents.consume(len); + }) + } +} + bitflags! { #[derive(Serialize, Deserialize, TraceRawVcs)] pub struct LinkType: u8 { diff --git a/crates/turbo-tasks-fs/src/rope.rs b/crates/turbo-tasks-fs/src/rope.rs index 83c5be299ce0bd..8696915b63d151 100644 --- a/crates/turbo-tasks-fs/src/rope.rs +++ b/crates/turbo-tasks-fs/src/rope.rs @@ -2,7 +2,7 @@ use std::{ borrow::Cow, cmp::min, fmt, - io::{self, BufRead, Read, Result as IoResult, Write}, + io::{BufRead, Read, Result as IoResult, Write}, mem, ops::{AddAssign, Deref}, pin::Pin, @@ -620,7 +620,7 @@ impl Iterator for RopeReader { } impl Read for RopeReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { + fn read(&mut self, buf: &mut [u8]) -> IoResult { Ok(self.read_internal(buf.len(), &mut ReadBuf::new(buf))) } } @@ -630,7 +630,7 @@ impl AsyncRead for RopeReader { self: Pin<&mut Self>, _cx: &mut TaskContext<'_>, buf: &mut ReadBuf<'_>, - ) -> Poll> { + ) -> Poll> { let this = self.get_mut(); this.read_internal(buf.remaining(), buf); Poll::Ready(Ok(())) diff --git a/crates/turbo-tasks-fs/src/util.rs b/crates/turbo-tasks-fs/src/util.rs index 39f4f2b0d6a96f..605a5d2217162e 100644 --- a/crates/turbo-tasks-fs/src/util.rs +++ b/crates/turbo-tasks-fs/src/util.rs @@ -1,4 +1,20 @@ -use std::borrow::Cow; +use std::{ + borrow::Cow, + io::{ErrorKind, Result as IoResult}, + path::Path, + pin::Pin, + task::{Context as TaskContext, Poll}, +}; + +use anyhow::{anyhow, Result}; +use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf}; + +// https://github.com/rust-lang/rust/blob/13471d3b2046cce78181dde6cfc146c09f55e29e/library/std/src/sys_common/io.rs#L1-L3 +const DEFAULT_BUF_SIZE: usize = if cfg!(target_os = "espidf") { + 512 +} else { + 8 * 1024 +}; /// Joins two /-separated paths into a normalized path. /// Paths are concatenated with /. @@ -110,3 +126,73 @@ pub fn normalize_request(str: &str) -> String { } seqments.join("/") } + +/// AsyncBufReader adds buffering to any [AsyncRead]. +/// +/// This essentially just implements [AsyncBufRead] over an [AsyncRead], using a +/// large buffer to store data. As the data is consumed, an offset buffer will +/// continue to be returned until the full buffer has been consumed. This allows +/// us to skip the overhead of, eg, repeated sys calls to read from disk as we +/// process a smaller number of bytes. +pub struct AsyncBufReader<'a, T: AsyncRead + Unpin + Sized> { + inner: &'a mut T, + offset: usize, + capacity: usize, + buffer: [u8; DEFAULT_BUF_SIZE], +} + +impl<'a, T: AsyncRead + Unpin + Sized> AsyncBufReader<'a, T> { + pub fn new(inner: &'a mut T) -> Self { + AsyncBufReader { + inner, + offset: 0, + capacity: 0, + buffer: [0; DEFAULT_BUF_SIZE], + } + } +} + +impl<'a, T: AsyncRead + Unpin + Sized> AsyncRead for AsyncBufReader<'a, T> { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut TaskContext<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let inner = Pin::new(&mut self.get_mut().inner); + inner.poll_read(cx, buf) + } +} + +impl<'a, T: AsyncRead + Unpin + Sized> AsyncBufRead for AsyncBufReader<'a, T> { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll> { + let this = self.get_mut(); + if this.offset >= this.capacity { + let inner = Pin::new(&mut this.inner); + let mut buf = ReadBuf::new(&mut this.buffer); + match inner.poll_read(cx, &mut buf) { + Poll::Ready(Ok(())) => {} + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Pending => return Poll::Pending, + }; + + this.capacity = buf.filled().len(); + this.offset = 0; + } + Poll::Ready(Ok(&this.buffer[this.offset..this.capacity])) + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + self.get_mut().offset += amt; + } +} + +/// Converts a disk access Result into a Result>, where a NotFound +/// error results in a None value. This is purely to reduce boilerplate code +/// comparing against NotFound errors against all other errors. +pub fn extract_disk_access(value: IoResult, path: &Path) -> Result> { + match value { + Ok(v) => Ok(Some(v)), + Err(e) if e.kind() == ErrorKind::NotFound => Ok(None), + Err(e) => Err(anyhow!(e).context(format!("reading file {}", path.display()))), + } +}