Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use streaming file reads when comparing for writes #4093

Merged
merged 6 commits into from
Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 88 additions & 25 deletions crates/turbo-tasks-fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#![feature(main_separator_str)]
#![feature(box_syntax)]
#![feature(round_char_boundary)]
#![feature(new_uninit)]
jridgewell marked this conversation as resolved.
Show resolved Hide resolved

pub mod attach;
pub mod embed;
Expand All @@ -21,10 +22,11 @@ pub mod util;

use std::{
borrow::Cow,
cmp::min,
collections::{HashMap, HashSet},
fmt::{self, Debug, Display, Formatter},
fs::FileType,
io::{self, ErrorKind},
io::{self, BufRead, ErrorKind},
mem::take,
path::{Path, PathBuf, MAIN_SEPARATOR},
sync::{
Expand All @@ -47,7 +49,10 @@ use read_glob::read_glob;
pub use read_glob::{ReadGlobResult, ReadGlobResultVc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::{fs, io::AsyncReadExt};
use tokio::{
fs,
io::{AsyncBufReadExt, AsyncReadExt, BufReader},
};
use turbo_tasks::{
mark_stateful,
primitives::{BoolVc, StringReadRef, StringVc},
Expand All @@ -56,7 +61,7 @@ use turbo_tasks::{
CompletionVc, Invalidator, ValueToString, ValueToStringVc,
};
use turbo_tasks_hash::hash_xxh3_hash64;
use util::{join_path, normalize_path, sys_to_unix, unix_to_sys};
use util::{extract_disk_access, join_path, normalize_path, sys_to_unix, unix_to_sys};

use self::{json::UnparseableJson, mutex_map::MutexMap};
use crate::{
Expand Down Expand Up @@ -416,26 +421,21 @@ impl Debug for DiskFileSystem {
}
}

/// Reads the file from disk, without converting the contents into a Vc.
async fn read_file(path: PathBuf, mutex_map: &MutexMap<PathBuf>) -> Result<FileContent> {
let _lock = mutex_map.lock(path.clone()).await;
Ok(match retry_future(|| File::from_path(path.clone())).await {
Ok(file) => FileContent::new(file),
Err(e) if e.kind() == ErrorKind::NotFound => FileContent::NotFound,
Err(e) => {
bail!(anyhow!(e).context(format!("reading file {}", path.display())))
}
})
}

#[turbo_tasks::value_impl]
impl FileSystem for DiskFileSystem {
#[turbo_tasks::function]
async fn read(&self, fs_path: FileSystemPathVc) -> Result<FileContentVc> {
let full_path = self.to_sys_path(fs_path).await?;
self.register_invalidator(&full_path)?;

let content = read_file(full_path, &self.mutex_map).await?;
let _lock = self.mutex_map.lock(full_path.clone()).await;
let content = match retry_future(|| File::from_path(full_path.clone())).await {
Ok(file) => FileContent::new(file),
Err(e) if e.kind() == ErrorKind::NotFound => FileContent::NotFound,
Err(e) => {
bail!(anyhow!(e).context(format!("reading file {}", full_path.display())))
}
};
Ok(content.cell())
}

Expand Down Expand Up @@ -594,19 +594,19 @@ impl FileSystem for DiskFileSystem {
// Track the file, so that we will rewrite it if it ever changes.
fs_path.track().await?;

// We perform an untracked read here, so that this write is not dependent on the
// read's FileContent value (and the memory it holds). Our untracked read can be
// freed immediately. Given this is an output file, it's unlikely any Turbo code
// will need to read the file from disk into a FileContentVc, so we're not
// wasting cycles.
let old_content = read_file(full_path.clone(), &self.mutex_map).await?;
let _lock = self.mutex_map.lock(full_path.clone()).await;

if *content == old_content {
// We perform an untracked comparison here, so that this write is not dependent
// on a read's FileContentVc (and the memory it holds). Our untracked read can
// be freed immediately. Given this is an output file, it's unlikely any Turbo
// code will need to read the file from disk into a FileContentVc, so we're not
// wasting cycles.
let compare = content.streaming_compare(full_path.clone()).await?;
if compare == FileComparison::Equal {
return Ok(CompletionVc::unchanged());
}
let _lock = self.mutex_map.lock(full_path.clone()).await;

let create_directory = old_content == FileContent::NotFound;
let create_directory = compare == FileComparison::Create;
match &*content {
FileContent::Content(file) => {
if create_directory {
Expand Down Expand Up @@ -1300,6 +1300,69 @@ impl From<File> for FileContentVc {
}
}

#[derive(Clone, Debug, Eq, PartialEq)]
enum FileComparison {
Create,
Equal,
NotEqual,
}

impl FileContent {
/// Performs a comparison of self's data against a disk file's streamed
/// read.
async fn streaming_compare(&self, path: PathBuf) -> Result<FileComparison> {
let old_file = extract_disk_access(retry_future(|| fs::File::open(&path)).await, &path)?;
let Some(mut old_file) = old_file else {
return Ok(match self {
FileContent::NotFound => FileComparison::Equal,
_ => FileComparison::Create,
});
};
// We know old file exists, does the new file?
let FileContent::Content(new_file) = self else {
return Ok(FileComparison::NotEqual);
};

let old_meta = extract_disk_access(retry_future(|| old_file.metadata()).await, &path)?;
let Some(old_meta) = old_meta else {
// If we failed to get meta, then the old file has been deleted between the handle open.
// In which case, we just pretend the file never existed.
return Ok(FileComparison::Create);
};
// If the meta is different, we need to rewrite the file to update it.
if new_file.meta != old_meta.into() {
return Ok(FileComparison::NotEqual);
}

// So meta matches, and we have a file handle. Let's stream the contents to see
// if they match.
let mut new_contents = new_file.read();
let mut old_contents = BufReader::new(&mut old_file);
Ok(loop {
let new_chunk = new_contents.fill_buf()?;
let Ok(old_chunk) = old_contents.fill_buf().await else {
break FileComparison::NotEqual;
};

let len = min(new_chunk.len(), old_chunk.len());
if len == 0 {
if new_chunk.len() == old_chunk.len() {
break FileComparison::Equal;
} else {
break FileComparison::NotEqual;
}
}

if new_chunk[0..len] != old_chunk[0..len] {
break FileComparison::NotEqual;
}

new_contents.consume(len);
old_contents.consume(len);
})
}
}

bitflags! {
#[derive(Serialize, Deserialize, TraceRawVcs)]
pub struct LinkType: u8 {
Expand Down
6 changes: 3 additions & 3 deletions crates/turbo-tasks-fs/src/rope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
borrow::Cow,
cmp::min,
fmt,
io::{self, BufRead, Read, Result as IoResult, Write},
io::{BufRead, Read, Result as IoResult, Write},
mem,
ops::{AddAssign, Deref},
pin::Pin,
Expand Down Expand Up @@ -620,7 +620,7 @@ impl Iterator for RopeReader {
}

impl Read for RopeReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
Ok(self.read_internal(buf.len(), &mut ReadBuf::new(buf)))
}
}
Expand All @@ -630,7 +630,7 @@ impl AsyncRead for RopeReader {
self: Pin<&mut Self>,
_cx: &mut TaskContext<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
) -> Poll<IoResult<()>> {
let this = self.get_mut();
this.read_internal(buf.remaining(), buf);
Poll::Ready(Ok(()))
Expand Down
19 changes: 18 additions & 1 deletion crates/turbo-tasks-fs/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use std::borrow::Cow;
use std::{
borrow::Cow,
io::{ErrorKind, Result as IoResult},
path::Path,
};

use anyhow::{anyhow, Result};

/// Joins two /-separated paths into a normalized path.
/// Paths are concatenated with /.
Expand Down Expand Up @@ -110,3 +116,14 @@ pub fn normalize_request(str: &str) -> String {
}
seqments.join("/")
}

/// Converts a disk access Result<T> into a Result<Some<T>>, where a NotFound
/// error results in a None value. This is purely to reduce boilerplate code
/// comparing NotFound errors against all other errors.
pub fn extract_disk_access<T>(value: IoResult<T>, path: &Path) -> Result<Option<T>> {
match value {
Ok(v) => Ok(Some(v)),
Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
Err(e) => Err(anyhow!(e).context(format!("reading file {}", path.display()))),
}
}