Skip to content

Commit

Permalink
Use streaming file reads when comparing for writes (vercel#4093)
Browse files Browse the repository at this point in the history
### Description

Following vercel#3526, this reimplements `write(content)`'s file comparison to
stream the contents of the file. Not every file we write is massive, but
some of the ones we generate are 100kb+. I hope this can reduce memory
pressure a bit by using a consistent 8kb block size, instead of
allocating a buffer for the full file contents just to check if we
should write.

### Testing Instructions

Wait for benchmarks.

---------

Co-authored-by: Alex Kirszenberg <[email protected]>
  • Loading branch information
2 people authored and NicholasLYang committed Mar 9, 2023
1 parent eef8234 commit 7c59caf
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 29 deletions.
112 changes: 87 additions & 25 deletions crates/turbo-tasks-fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ pub mod util;

use std::{
borrow::Cow,
cmp::min,
collections::{HashMap, HashSet},
fmt::{self, Debug, Display, Formatter},
fs::FileType,
io::{self, ErrorKind},
io::{self, BufRead, ErrorKind},
mem::take,
path::{Path, PathBuf, MAIN_SEPARATOR},
sync::{
Expand All @@ -47,7 +48,10 @@ use read_glob::read_glob;
pub use read_glob::{ReadGlobResult, ReadGlobResultVc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::{fs, io::AsyncReadExt};
use tokio::{
fs,
io::{AsyncBufReadExt, AsyncReadExt, BufReader},
};
use turbo_tasks::{
mark_stateful,
primitives::{BoolVc, StringReadRef, StringVc},
Expand All @@ -56,7 +60,7 @@ use turbo_tasks::{
CompletionVc, Invalidator, ValueToString, ValueToStringVc,
};
use turbo_tasks_hash::hash_xxh3_hash64;
use util::{join_path, normalize_path, sys_to_unix, unix_to_sys};
use util::{extract_disk_access, join_path, normalize_path, sys_to_unix, unix_to_sys};

use self::{json::UnparseableJson, mutex_map::MutexMap};
use crate::{
Expand Down Expand Up @@ -416,26 +420,21 @@ impl Debug for DiskFileSystem {
}
}

/// Reads the file from disk, without converting the contents into a Vc.
async fn read_file(path: PathBuf, mutex_map: &MutexMap<PathBuf>) -> Result<FileContent> {
let _lock = mutex_map.lock(path.clone()).await;
Ok(match retry_future(|| File::from_path(path.clone())).await {
Ok(file) => FileContent::new(file),
Err(e) if e.kind() == ErrorKind::NotFound => FileContent::NotFound,
Err(e) => {
bail!(anyhow!(e).context(format!("reading file {}", path.display())))
}
})
}

#[turbo_tasks::value_impl]
impl FileSystem for DiskFileSystem {
#[turbo_tasks::function]
async fn read(&self, fs_path: FileSystemPathVc) -> Result<FileContentVc> {
let full_path = self.to_sys_path(fs_path).await?;
self.register_invalidator(&full_path)?;

let content = read_file(full_path, &self.mutex_map).await?;
let _lock = self.mutex_map.lock(full_path.clone()).await;
let content = match retry_future(|| File::from_path(full_path.clone())).await {
Ok(file) => FileContent::new(file),
Err(e) if e.kind() == ErrorKind::NotFound => FileContent::NotFound,
Err(e) => {
bail!(anyhow!(e).context(format!("reading file {}", full_path.display())))
}
};
Ok(content.cell())
}

Expand Down Expand Up @@ -594,19 +593,19 @@ impl FileSystem for DiskFileSystem {
// Track the file, so that we will rewrite it if it ever changes.
fs_path.track().await?;

// We perform an untracked read here, so that this write is not dependent on the
// read's FileContent value (and the memory it holds). Our untracked read can be
// freed immediately. Given this is an output file, it's unlikely any Turbo code
// will need to read the file from disk into a FileContentVc, so we're not
// wasting cycles.
let old_content = read_file(full_path.clone(), &self.mutex_map).await?;
let _lock = self.mutex_map.lock(full_path.clone()).await;

if *content == old_content {
// We perform an untracked comparison here, so that this write is not dependent
// on a read's FileContentVc (and the memory it holds). Our untracked read can
// be freed immediately. Given this is an output file, it's unlikely any Turbo
// code will need to read the file from disk into a FileContentVc, so we're not
// wasting cycles.
let compare = content.streaming_compare(full_path.clone()).await?;
if compare == FileComparison::Equal {
return Ok(CompletionVc::unchanged());
}
let _lock = self.mutex_map.lock(full_path.clone()).await;

let create_directory = old_content == FileContent::NotFound;
let create_directory = compare == FileComparison::Create;
match &*content {
FileContent::Content(file) => {
if create_directory {
Expand Down Expand Up @@ -1300,6 +1299,69 @@ impl From<File> for FileContentVc {
}
}

#[derive(Clone, Debug, Eq, PartialEq)]
enum FileComparison {
Create,
Equal,
NotEqual,
}

impl FileContent {
/// Performs a comparison of self's data against a disk file's streamed
/// read.
async fn streaming_compare(&self, path: PathBuf) -> Result<FileComparison> {
let old_file = extract_disk_access(retry_future(|| fs::File::open(&path)).await, &path)?;
let Some(mut old_file) = old_file else {
return Ok(match self {
FileContent::NotFound => FileComparison::Equal,
_ => FileComparison::Create,
});
};
// We know old file exists, does the new file?
let FileContent::Content(new_file) = self else {
return Ok(FileComparison::NotEqual);
};

let old_meta = extract_disk_access(retry_future(|| old_file.metadata()).await, &path)?;
let Some(old_meta) = old_meta else {
// If we failed to get meta, then the old file has been deleted between the handle open.
// In which case, we just pretend the file never existed.
return Ok(FileComparison::Create);
};
// If the meta is different, we need to rewrite the file to update it.
if new_file.meta != old_meta.into() {
return Ok(FileComparison::NotEqual);
}

// So meta matches, and we have a file handle. Let's stream the contents to see
// if they match.
let mut new_contents = new_file.read();
let mut old_contents = BufReader::new(&mut old_file);
Ok(loop {
let new_chunk = new_contents.fill_buf()?;
let Ok(old_chunk) = old_contents.fill_buf().await else {
break FileComparison::NotEqual;
};

let len = min(new_chunk.len(), old_chunk.len());
if len == 0 {
if new_chunk.len() == old_chunk.len() {
break FileComparison::Equal;
} else {
break FileComparison::NotEqual;
}
}

if new_chunk[0..len] != old_chunk[0..len] {
break FileComparison::NotEqual;
}

new_contents.consume(len);
old_contents.consume(len);
})
}
}

bitflags! {
#[derive(Serialize, Deserialize, TraceRawVcs)]
pub struct LinkType: u8 {
Expand Down
6 changes: 3 additions & 3 deletions crates/turbo-tasks-fs/src/rope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
borrow::Cow,
cmp::min,
fmt,
io::{self, BufRead, Read, Result as IoResult, Write},
io::{BufRead, Read, Result as IoResult, Write},
mem,
ops::{AddAssign, Deref},
pin::Pin,
Expand Down Expand Up @@ -620,7 +620,7 @@ impl Iterator for RopeReader {
}

impl Read for RopeReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
Ok(self.read_internal(buf.len(), &mut ReadBuf::new(buf)))
}
}
Expand All @@ -630,7 +630,7 @@ impl AsyncRead for RopeReader {
self: Pin<&mut Self>,
_cx: &mut TaskContext<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
) -> Poll<IoResult<()>> {
let this = self.get_mut();
this.read_internal(buf.remaining(), buf);
Poll::Ready(Ok(()))
Expand Down
19 changes: 18 additions & 1 deletion crates/turbo-tasks-fs/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use std::borrow::Cow;
use std::{
borrow::Cow,
io::{ErrorKind, Result as IoResult},
path::Path,
};

use anyhow::{anyhow, Result};

/// Joins two /-separated paths into a normalized path.
/// Paths are concatenated with /.
Expand Down Expand Up @@ -110,3 +116,14 @@ pub fn normalize_request(str: &str) -> String {
}
seqments.join("/")
}

/// Converts a disk access Result<T> into a Result<Some<T>>, where a NotFound
/// error results in a None value. This is purely to reduce boilerplate code
/// comparing NotFound errors against all other errors.
pub fn extract_disk_access<T>(value: IoResult<T>, path: &Path) -> Result<Option<T>> {
match value {
Ok(v) => Ok(Some(v)),
Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
Err(e) => Err(anyhow!(e).context(format!("reading file {}", path.display()))),
}
}

0 comments on commit 7c59caf

Please sign in to comment.