Skip to content

Commit

Permalink
Merge f6f27fe into 9d49e67
Browse files Browse the repository at this point in the history
  • Loading branch information
jridgewell authored Mar 7, 2023
2 parents 9d49e67 + f6f27fe commit 6fd5d29
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 29 deletions.
115 changes: 90 additions & 25 deletions crates/turbo-tasks-fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#![feature(main_separator_str)]
#![feature(box_syntax)]
#![feature(round_char_boundary)]
#![feature(new_uninit)]

pub mod attach;
pub mod embed;
Expand All @@ -21,10 +22,11 @@ pub mod util;

use std::{
borrow::Cow,
cmp::min,
collections::{HashMap, HashSet},
fmt::{self, Debug, Display, Formatter},
fs::FileType,
io::{self, ErrorKind},
io::{self, BufRead, ErrorKind},
mem::take,
path::{Path, PathBuf, MAIN_SEPARATOR},
sync::{
Expand All @@ -47,7 +49,10 @@ use read_glob::read_glob;
pub use read_glob::{ReadGlobResult, ReadGlobResultVc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::{fs, io::AsyncReadExt};
use tokio::{
fs,
io::{AsyncBufReadExt, AsyncReadExt},
};
use turbo_tasks::{
mark_stateful,
primitives::{BoolVc, StringReadRef, StringVc},
Expand All @@ -56,7 +61,9 @@ use turbo_tasks::{
CompletionVc, Invalidator, ValueToString, ValueToStringVc,
};
use turbo_tasks_hash::hash_xxh3_hash64;
use util::{join_path, normalize_path, sys_to_unix, unix_to_sys};
use util::{
extract_disk_access, join_path, normalize_path, sys_to_unix, unix_to_sys, AsyncBufReader,
};

use self::{json::UnparseableJson, mutex_map::MutexMap};
use crate::{
Expand Down Expand Up @@ -312,26 +319,21 @@ impl Debug for DiskFileSystem {
}
}

/// Reads the file from disk, without converting the contents into a Vc.
async fn read_file(path: PathBuf, mutex_map: &MutexMap<PathBuf>) -> Result<FileContent> {
let _lock = mutex_map.lock(path.clone()).await;
Ok(match retry_future(|| File::from_path(path.clone())).await {
Ok(file) => FileContent::new(file),
Err(e) if e.kind() == ErrorKind::NotFound => FileContent::NotFound,
Err(e) => {
bail!(anyhow!(e).context(format!("reading file {}", path.display())))
}
})
}

#[turbo_tasks::value_impl]
impl FileSystem for DiskFileSystem {
#[turbo_tasks::function]
async fn read(&self, fs_path: FileSystemPathVc) -> Result<FileContentVc> {
let full_path = self.to_sys_path(fs_path).await?;
self.register_invalidator(&full_path, true);

let content = read_file(full_path, &self.mutex_map).await?;
let _lock = self.mutex_map.lock(full_path.clone()).await;
let content = match retry_future(|| File::from_path(full_path.clone())).await {
Ok(file) => FileContent::new(file),
Err(e) if e.kind() == ErrorKind::NotFound => FileContent::NotFound,
Err(e) => {
bail!(anyhow!(e).context(format!("reading file {}", full_path.display())))
}
};
Ok(content.cell())
}

Expand Down Expand Up @@ -490,19 +492,19 @@ impl FileSystem for DiskFileSystem {
// Track the file, so that we will rewrite it if it ever changes.
fs_path.track().await?;

// We perform an untracked read here, so that this write is not dependent on the
// read's FileContent value (and the memory it holds). Our untracked read can be
// freed immediately. Given this is an output file, it's unlikely any Turbo code
// will need to read the file from disk into a FileContentVc, so we're not
// wasting cycles.
let old_content = read_file(full_path.clone(), &self.mutex_map).await?;
let _lock = self.mutex_map.lock(full_path.clone()).await;

if *content == old_content {
// We perform an untracked comparison here, so that this write is not dependent
// on a read's FileContentVc (and the memory it holds). Our untracked read can
// be freed immediately. Given this is an output file, it's unlikely any Turbo
// code will need to read the file from disk into a FileContentVc, so we're not
// wasting cycles.
let compare = content.streaming_compare(full_path.clone()).await?;
if compare == FileComparison::Equal {
return Ok(CompletionVc::unchanged());
}
let _lock = self.mutex_map.lock(full_path.clone()).await;

let create_directory = old_content == FileContent::NotFound;
let create_directory = compare == FileComparison::Create;
match &*content {
FileContent::Content(file) => {
if create_directory {
Expand Down Expand Up @@ -1196,6 +1198,69 @@ impl From<File> for FileContentVc {
}
}

#[derive(Clone, Debug, Eq, PartialEq)]
enum FileComparison {
Create,
Equal,
NotEqual,
}

impl FileContent {
/// Performs a comparison of self's data against a disk file's streamed
/// read.
async fn streaming_compare(&self, path: PathBuf) -> Result<FileComparison> {
let old_file = extract_disk_access(retry_future(|| fs::File::open(&path)).await, &path)?;
let Some(mut old_file) = old_file else {
return Ok(match self {
FileContent::NotFound => FileComparison::Equal,
_ => FileComparison::Create,
});
};
// We know old file exists, does the new file?
let FileContent::Content(new_file) = self else {
return Ok(FileComparison::NotEqual);
};

let old_meta = extract_disk_access(retry_future(|| old_file.metadata()).await, &path)?;
let Some(old_meta) = old_meta else {
// If we failed to get meta, then the old file has been deleted between the handle open.
// In which case, we just pretend the file never existed.
return Ok(FileComparison::Create);
};
// If the meta is different, we need to rewrite the file to update it.
if new_file.meta != old_meta.into() {
return Ok(FileComparison::NotEqual);
}

// So meta matches, and we have a file handle. Let's stream the contents to see
// if they match.
let mut new_contents = new_file.read();
let mut old_contents = AsyncBufReader::new(&mut old_file);
Ok(loop {
let new_chunk = new_contents.fill_buf()?;
let Ok(old_chunk) = old_contents.fill_buf().await else {
break FileComparison::NotEqual;
};

let len = min(new_chunk.len(), old_chunk.len());
if len == 0 {
if new_chunk.len() == old_chunk.len() {
break FileComparison::Equal;
} else {
break FileComparison::NotEqual;
}
}

if new_chunk[0..len] != old_chunk[0..len] {
break FileComparison::NotEqual;
}

new_contents.consume(len);
old_contents.consume(len);
})
}
}

bitflags! {
#[derive(Serialize, Deserialize, TraceRawVcs)]
pub struct LinkType: u8 {
Expand Down
6 changes: 3 additions & 3 deletions crates/turbo-tasks-fs/src/rope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
borrow::Cow,
cmp::min,
fmt,
io::{self, BufRead, Read, Result as IoResult, Write},
io::{BufRead, Read, Result as IoResult, Write},
mem,
ops::{AddAssign, Deref},
pin::Pin,
Expand Down Expand Up @@ -620,7 +620,7 @@ impl Iterator for RopeReader {
}

impl Read for RopeReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
Ok(self.read_internal(buf.len(), &mut ReadBuf::new(buf)))
}
}
Expand All @@ -630,7 +630,7 @@ impl AsyncRead for RopeReader {
self: Pin<&mut Self>,
_cx: &mut TaskContext<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
) -> Poll<IoResult<()>> {
let this = self.get_mut();
this.read_internal(buf.remaining(), buf);
Poll::Ready(Ok(()))
Expand Down
88 changes: 87 additions & 1 deletion crates/turbo-tasks-fs/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,20 @@
use std::borrow::Cow;
use std::{
borrow::Cow,
io::{ErrorKind, Result as IoResult},
path::Path,
pin::Pin,
task::{Context as TaskContext, Poll},
};

use anyhow::{anyhow, Result};
use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf};

// https://github.com/rust-lang/rust/blob/13471d3b2046cce78181dde6cfc146c09f55e29e/library/std/src/sys_common/io.rs#L1-L3
const DEFAULT_BUF_SIZE: usize = if cfg!(target_os = "espidf") {
512
} else {
8 * 1024
};

/// Joins two /-separated paths into a normalized path.
/// Paths are concatenated with /.
Expand Down Expand Up @@ -110,3 +126,73 @@ pub fn normalize_request(str: &str) -> String {
}
seqments.join("/")
}

/// AsyncBufReader adds buffering to any [AsyncRead].
///
/// This essentially just implements [AsyncBufRead] over an [AsyncRead], using a
/// large buffer to store data. As the data is consumed, an offset buffer will
/// continue to be returned until the full buffer has been consumed. This allows
/// us to skip the overhead of, eg, repeated sys calls to read from disk as we
/// process a smaller number of bytes.
pub struct AsyncBufReader<'a, T: AsyncRead + Unpin + Sized> {
inner: &'a mut T,
offset: usize,
capacity: usize,
buffer: [u8; DEFAULT_BUF_SIZE],
}

impl<'a, T: AsyncRead + Unpin + Sized> AsyncBufReader<'a, T> {
pub fn new(inner: &'a mut T) -> Self {
AsyncBufReader {
inner,
offset: 0,
capacity: 0,
buffer: [0; DEFAULT_BUF_SIZE],
}
}
}

impl<'a, T: AsyncRead + Unpin + Sized> AsyncRead for AsyncBufReader<'a, T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut TaskContext<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<IoResult<()>> {
let inner = Pin::new(&mut self.get_mut().inner);
inner.poll_read(cx, buf)
}
}

impl<'a, T: AsyncRead + Unpin + Sized> AsyncBufRead for AsyncBufReader<'a, T> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<IoResult<&[u8]>> {
let this = self.get_mut();
if this.offset >= this.capacity {
let inner = Pin::new(&mut this.inner);
let mut buf = ReadBuf::new(&mut this.buffer);
match inner.poll_read(cx, &mut buf) {
Poll::Ready(Ok(())) => {}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => return Poll::Pending,
};

this.capacity = buf.filled().len();
this.offset = 0;
}
Poll::Ready(Ok(&this.buffer[this.offset..this.capacity]))
}

fn consume(self: Pin<&mut Self>, amt: usize) {
self.get_mut().offset += amt;
}
}

/// Converts a disk access Result<T> into a Result<Some<T>>, where a NotFound
/// error results in a None value. This is purely to reduce boilerplate code
/// comparing against NotFound errors against all other errors.
pub fn extract_disk_access<T>(value: IoResult<T>, path: &Path) -> Result<Option<T>> {
match value {
Ok(v) => Ok(Some(v)),
Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
Err(e) => Err(anyhow!(e).context(format!("reading file {}", path.display()))),
}
}

0 comments on commit 6fd5d29

Please sign in to comment.