Skip to content

Commit

Permalink
Use Async IO for File Read/Write (#305)
Browse files Browse the repository at this point in the history
## Linux

```
OS: Ubuntu 20.04.4 LTS on Windows 10 x86_64
Kernel: 5.15.57.1-microsoft-standard-WSL2
CPU: AMD Ryzen 9 5950X (32) @ 3.400GHz
```

`cargo run --release --bin node-file-trace -- build crates/turbopack/tests/node-file-trace/integration/better-sqlite3.js`

### Before

```
done in 560ms (459ms compilation, 458ms task execution, 109028 tasks)
```

### After
```
done in 378ms (277ms compilation, 277ms task execution, 109028 tasks)
```

## macOS

```
OS: macOS 12.5.1 21G83 arm64
Host: MacBookPro18,2
CPU: Apple M1 Max
```


### Before

```
done in 482ms (380ms compilation, 379ms task execution, 111640 tasks)
```

### After

```
done in 362ms (259ms compilation, 259ms task execution, 111640 tasks)
```
  • Loading branch information
Brooooooklyn authored Sep 1, 2022
1 parent 71d5cd2 commit b115392
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 67 deletions.
50 changes: 32 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/turbo-tasks-fs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ bench = false
[dependencies]
anyhow = "1.0.47"
concurrent-queue = "1.2.2"
futures = "0.3.24"
futures-retry = "0.6.0"
json = "0.12.4"
lazy_static = "1.4.0"
mime = "0.3.16"
notify = "4.0.17"
serde = { version = "1.0.136", features = ["rc"] }
tokio = "1.11.0"
tokio-stream = { version = "0.1.9", features = ["fs"] }
turbo-tasks = { path = "../turbo-tasks" }

[dev-dependencies]
Expand Down
171 changes: 124 additions & 47 deletions crates/turbo-tasks-fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{
collections::{HashMap, HashSet},
fmt::{self, Debug, Display},
fs::{self, create_dir_all},
io::{ErrorKind, Write},
io::{self, ErrorKind},
mem::take,
path::{Path, PathBuf, MAIN_SEPARATOR},
sync::{
Expand All @@ -27,6 +27,7 @@ use std::{
};

use anyhow::{anyhow, bail, Context, Result};
use futures_retry::{ErrorHandler, FutureRetry, RetryPolicy};
use glob::GlobVc;
use invalidator_map::InvalidatorMap;
use json::{parse, JsonValue};
Expand All @@ -35,6 +36,9 @@ use notify::{watcher, DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher
use read_glob::read_glob;
pub use read_glob::{ReadGlobResult, ReadGlobResultVc};
use serde::{Deserialize, Serialize};
#[cfg(unix)]
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use turbo_tasks::{
primitives::StringVc, spawn_blocking, spawn_thread, trace::TraceRawVcs, CompletionVc,
Invalidator, ValueToString, ValueToStringVc,
Expand Down Expand Up @@ -292,13 +296,14 @@ impl DiskFileSystem {
}
}

fn can_retry(err: &std::io::Error) -> bool {
matches!(
err.kind(),
ErrorKind::PermissionDenied | ErrorKind::WouldBlock
)
}

fn with_retry<T>(func: impl Fn() -> Result<T, std::io::Error>) -> Result<T, std::io::Error> {
fn can_retry(err: &std::io::Error) -> bool {
matches!(
err.kind(),
ErrorKind::PermissionDenied | ErrorKind::WouldBlock
)
}
let mut result = func();
if let Err(e) = &result {
if can_retry(e) {
Expand All @@ -319,6 +324,28 @@ fn with_retry<T>(func: impl Fn() -> Result<T, std::io::Error>) -> Result<T, std:
result
}

struct FutureRetryHandle {
max_attempts: usize,
}

impl Default for FutureRetryHandle {
fn default() -> Self {
Self { max_attempts: 10 }
}
}

impl ErrorHandler<io::Error> for FutureRetryHandle {
type OutError = io::Error;

fn handle(&mut self, attempt: usize, e: io::Error) -> RetryPolicy<io::Error> {
if attempt == self.max_attempts || !can_retry(&e) {
RetryPolicy::ForwardError(e)
} else {
RetryPolicy::WaitRetry(Duration::from_millis(10 + (attempt as u64) * 100))
}
}
}

impl fmt::Debug for DiskFileSystem {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "name: {}, root: {}", self.name, self.root)
Expand All @@ -335,11 +362,13 @@ impl FileSystem for DiskFileSystem {
self.invalidator_map
.insert(path_to_key(full_path.as_path()), invalidator);
}
Ok(match self
.execute(move || with_retry(move || File::from_path(&full_path)))
.await
Ok(match FutureRetry::new(
|| File::from_path_async(full_path.clone()),
FutureRetryHandle::default(),
)
.await
{
Ok(file) => FileContent::new(file),
Ok((file, _attempts)) => FileContent::new(file),
Err(_) => FileContent::NotFound,
}
.into())
Expand All @@ -353,12 +382,8 @@ impl FileSystem for DiskFileSystem {
self.dir_invalidator_map
.insert(path_to_key(full_path.as_path()), invalidator);
}
let result = self
.execute({
let full_path = full_path.clone();
move || with_retry(move || fs::read_dir(&full_path))
})
.await;
let full_path_to_read_dir = full_path.clone();
let result = with_retry(move || fs::read_dir(&full_path_to_read_dir));
Ok(match result {
Ok(res) => DirectoryContentVc::new(
res.filter_map(|e| -> Option<Result<(String, DirectoryEntry)>> {
Expand Down Expand Up @@ -455,40 +480,70 @@ impl FileSystem for DiskFileSystem {
.with_context(|| format!("reading old content of {}", full_path.display()))?;
if *content != *old_content {
let create_directory = *old_content == FileContent::NotFound;
self.execute(move || match &*content {
FileContent::Content(file) => {
if create_directory {
if let Some(parent) = full_path.parent() {
with_retry(move || fs::create_dir_all(parent)).with_context(|| {
format!(
"failed to create directory {} for write to {}",
parent.display(),
full_path.display()
async {
match &*content {
FileContent::Content(file) => {
if create_directory {
if let Some(parent) = full_path.parent() {
FutureRetry::new(
move || tokio::fs::create_dir_all(parent),
FutureRetryHandle::default(),
)
})?;
.await
.map_err(|(err, _attempts)| err)
.with_context(|| {
format!(
"failed to create directory {} for write to {}",
parent.display(),
full_path.display()
)
})?;
}
}
// println!("write {} bytes to {}", buffer.len(), full_path.display());
let full_path_to_write = full_path.clone();
FutureRetry::new(
move || {
let full_path = full_path_to_write.clone();
async move {
let full_path = full_path.clone();
let mut f = tokio::fs::File::create(&full_path).await?;
f.write_all(&file.content).await?;
#[cfg(target_family = "unix")]
fs::set_permissions(&full_path, file.meta.permissions.into())?;
Ok::<(), io::Error>(())
}
},
FutureRetryHandle::default(),
)
.await
.map_err(|(err, _attempts)| err)
.with_context(|| format!("failed to write to {}", full_path.display()))
}
FileContent::NotFound => {
// println!("remove {}", full_path.display());
let full_path_to_remove = full_path.clone();
FutureRetry::new(
move || {
let full_path = full_path_to_remove.clone();
async move {
tokio::fs::remove_file(full_path).await.or_else(|err| {
if err.kind() == ErrorKind::NotFound {
Ok(())
} else {
Err(err)
}
})
}
},
FutureRetryHandle::default(),
)
.await
.map_err(|(err, _attempts)| err)
.context(anyhow!("removing {} failed", full_path.display()))
}
// println!("write {} bytes to {}", buffer.len(), full_path.display());
with_retry(|| {
let mut f = fs::File::create(&full_path)?;
f.write_all(&file.content)?;
#[cfg(target_family = "unix")]
fs::set_permissions(&full_path, file.meta.permissions.into())?;
Ok(())
})
.with_context(|| format!("failed to write to {}", full_path.display()))
}
FileContent::NotFound => {
// println!("remove {}", full_path.display());
with_retry(|| fs::remove_file(&full_path)).or_else(|err| {
if err.kind() == ErrorKind::NotFound {
Ok(())
} else {
Err(err).context(anyhow!("removing {} failed", full_path.display()))
}
})
}
})
}
.await?;
}
Ok(CompletionVc::new())
Expand Down Expand Up @@ -962,6 +1017,28 @@ impl File {
}
}

pub async fn from_path_async(p: PathBuf) -> Result<Self, std::io::Error> {
#[cfg(unix)]
{
let mut f = tokio::fs::File::open(p).await?;
let metadata = f.metadata().await?;
let mut output = Vec::with_capacity(metadata.len() as usize);
f.read_to_end(&mut output).await?;
Ok(File {
meta: metadata.into(),
content: output,
})
}
#[cfg(not(unix))]
{
let output = tokio::fs::read(p).await?;
Ok(File {
meta: Default::default(),
content: output,
})
}
}

pub fn from_source(str: String) -> Self {
File {
meta: Default::default(),
Expand Down
4 changes: 2 additions & 2 deletions crates/turbo-tasks/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ impl PersistentTaskType {
inputs: Vec<TaskInput>,
turbo_tasks: Arc<dyn TurboTasksBackendApi>,
) -> Result<RawVc> {
let mut resolved_inputs = Vec::new();
let mut resolved_inputs = Vec::with_capacity(inputs.len());
for input in inputs.into_iter() {
resolved_inputs.push(input.resolve().await?)
}
Expand All @@ -339,7 +339,7 @@ impl PersistentTaskType {
inputs: Vec<TaskInput>,
turbo_tasks: Arc<dyn TurboTasksBackendApi>,
) -> Result<RawVc> {
let mut resolved_inputs = Vec::new();
let mut resolved_inputs = Vec::with_capacity(inputs.len());
let mut iter = inputs.into_iter();
if let Some(this) = iter.next() {
let this = this.resolve().await?;
Expand Down

0 comments on commit b115392

Please sign in to comment.