From b115392f7ae374ac410303cd537e6eaa507917c5 Mon Sep 17 00:00:00 2001 From: LongYinan Date: Thu, 1 Sep 2022 12:39:34 +0800 Subject: [PATCH] Use Async IO for File Read/Write (#305) ## Linux ``` OS: Ubuntu 20.04.4 LTS on Windows 10 x86_64 Kernel: 5.15.57.1-microsoft-standard-WSL2 CPU: AMD Ryzen 9 5950X (32) @ 3.400GHz ``` `cargo run --release --bin node-file-trace -- build crates/turbopack/tests/node-file-trace/integration/better-sqlite3.js` ### Before ``` done in 560ms (459ms compilation, 458ms task execution, 109028 tasks) ``` ### After ``` done in 378ms (277ms compilation, 277ms task execution, 109028 tasks) ``` ## macOS ``` OS: macOS 12.5.1 21G83 arm64 Host: MacBookPro18,2 CPU: Apple M1 Max ``` ### Before ``` done in 482ms (380ms compilation, 379ms task execution, 111640 tasks) ``` ### After ``` done in 362ms (259ms compilation, 259ms task execution, 111640 tasks) ``` --- Cargo.lock | 50 +++++---- crates/turbo-tasks-fs/Cargo.toml | 3 + crates/turbo-tasks-fs/src/lib.rs | 171 ++++++++++++++++++++++-------- crates/turbo-tasks/src/backend.rs | 4 +- 4 files changed, 161 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cd56940989591..c7fa93d848897 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1067,9 +1067,9 @@ checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" [[package]] name = "futures" -version = "0.3.21" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e" +checksum = "7f21eda599937fba36daeb58a22e8f5cee2d14c4a17b5b7739c7c8e5e3b8230c" dependencies = [ "futures-channel", "futures-core", @@ -1082,9 +1082,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.21" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010" +checksum = "30bdd20c28fadd505d0fd6712cdfcb0d4b5648baf45faef7f852afb2399bb050" dependencies = [ "futures-core", "futures-sink", @@ -1092,15 +1092,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.21" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" +checksum = "4e5aa3de05362c3fb88de6531e6296e85cde7739cccad4b9dfeeb7f6ebce56bf" [[package]] name = "futures-executor" -version = "0.3.21" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6" +checksum = "9ff63c23854bee61b6e9cd331d523909f238fc7636290b96826e9cfa5faa00ab" dependencies = [ "futures-core", "futures-task", @@ -1109,32 +1109,43 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.21" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" +checksum = "bbf4d2a7a308fd4578637c0b17c7e1c7ba127b8f6ba00b29f717e9655d85eb68" [[package]] name = "futures-macro" -version = "0.3.21" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" +checksum = "42cd15d1c7456c04dbdf7e88bcd69760d74f3a798d6444e16974b505b0e62f17" dependencies = [ "proc-macro2 1.0.43", "quote 1.0.21", "syn 1.0.99", ] +[[package]] +name = "futures-retry" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fde5a672a61f96552aa5ed9fd9c81c3fbdae4be9b1e205d6eaf17c83705adc0f" +dependencies = [ + "futures", + "pin-project-lite", + "tokio", +] + [[package]] name = "futures-sink" -version = "0.3.21" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" +checksum = "21b20ba5a92e727ba30e72834706623d94ac93a725410b6a6b6fbc1b07f7ba56" [[package]] name = "futures-task" -version = "0.3.21" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" +checksum = "a6508c467c73851293f390476d4491cf4d227dbabcd4170f3bb6044959b294f1" [[package]] name = "futures-timer" @@ -1144,9 +1155,9 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" [[package]] name = "futures-util" -version = "0.3.21" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" +checksum = "44fb6cb1be61cc1d2e43b262516aafcf63b241cffdb1d3fa115f91d9c7b09c90" dependencies = [ "futures-channel", "futures-core", @@ -4458,6 +4469,8 @@ version = "0.1.0" dependencies = [ "anyhow", "concurrent-queue", + "futures", + "futures-retry", "json", "lazy_static", "mime", @@ -4466,6 +4479,7 @@ dependencies = [ "serde", "sha2", "tokio", + "tokio-stream", "turbo-tasks", "turbo-tasks-build", "turbo-tasks-memory", diff --git a/crates/turbo-tasks-fs/Cargo.toml b/crates/turbo-tasks-fs/Cargo.toml index 63f268d6c95ad..fb68cc4d58410 100644 --- a/crates/turbo-tasks-fs/Cargo.toml +++ b/crates/turbo-tasks-fs/Cargo.toml @@ -11,12 +11,15 @@ bench = false [dependencies] anyhow = "1.0.47" concurrent-queue = "1.2.2" +futures = "0.3.24" +futures-retry = "0.6.0" json = "0.12.4" lazy_static = "1.4.0" mime = "0.3.16" notify = "4.0.17" serde = { version = "1.0.136", features = ["rc"] } tokio = "1.11.0" +tokio-stream = { version = "0.1.9", features = ["fs"] } turbo-tasks = { path = "../turbo-tasks" } [dev-dependencies] diff --git a/crates/turbo-tasks-fs/src/lib.rs b/crates/turbo-tasks-fs/src/lib.rs index 002888d17d453..82eb8cc9bbd44 100644 --- a/crates/turbo-tasks-fs/src/lib.rs +++ b/crates/turbo-tasks-fs/src/lib.rs @@ -15,7 +15,7 @@ use std::{ collections::{HashMap, HashSet}, fmt::{self, Debug, Display}, fs::{self, create_dir_all}, - io::{ErrorKind, Write}, + io::{self, ErrorKind}, mem::take, path::{Path, PathBuf, MAIN_SEPARATOR}, sync::{ @@ -27,6 +27,7 @@ use std::{ }; use anyhow::{anyhow, bail, Context, Result}; +use futures_retry::{ErrorHandler, FutureRetry, RetryPolicy}; use glob::GlobVc; use invalidator_map::InvalidatorMap; use json::{parse, JsonValue}; @@ -35,6 +36,9 @@ use notify::{watcher, DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher use read_glob::read_glob; pub use read_glob::{ReadGlobResult, ReadGlobResultVc}; use serde::{Deserialize, Serialize}; +#[cfg(unix)] +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; use turbo_tasks::{ primitives::StringVc, spawn_blocking, spawn_thread, trace::TraceRawVcs, CompletionVc, Invalidator, ValueToString, ValueToStringVc, @@ -292,13 +296,14 @@ impl DiskFileSystem { } } +fn can_retry(err: &std::io::Error) -> bool { + matches!( + err.kind(), + ErrorKind::PermissionDenied | ErrorKind::WouldBlock + ) +} + fn with_retry(func: impl Fn() -> Result) -> Result { - fn can_retry(err: &std::io::Error) -> bool { - matches!( - err.kind(), - ErrorKind::PermissionDenied | ErrorKind::WouldBlock - ) - } let mut result = func(); if let Err(e) = &result { if can_retry(e) { @@ -319,6 +324,28 @@ fn with_retry(func: impl Fn() -> Result) -> Result Self { + Self { max_attempts: 10 } + } +} + +impl ErrorHandler for FutureRetryHandle { + type OutError = io::Error; + + fn handle(&mut self, attempt: usize, e: io::Error) -> RetryPolicy { + if attempt == self.max_attempts || !can_retry(&e) { + RetryPolicy::ForwardError(e) + } else { + RetryPolicy::WaitRetry(Duration::from_millis(10 + (attempt as u64) * 100)) + } + } +} + impl fmt::Debug for DiskFileSystem { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "name: {}, root: {}", self.name, self.root) @@ -335,11 +362,13 @@ impl FileSystem for DiskFileSystem { self.invalidator_map .insert(path_to_key(full_path.as_path()), invalidator); } - Ok(match self - .execute(move || with_retry(move || File::from_path(&full_path))) - .await + Ok(match FutureRetry::new( + || File::from_path_async(full_path.clone()), + FutureRetryHandle::default(), + ) + .await { - Ok(file) => FileContent::new(file), + Ok((file, _attempts)) => FileContent::new(file), Err(_) => FileContent::NotFound, } .into()) @@ -353,12 +382,8 @@ impl FileSystem for DiskFileSystem { self.dir_invalidator_map .insert(path_to_key(full_path.as_path()), invalidator); } - let result = self - .execute({ - let full_path = full_path.clone(); - move || with_retry(move || fs::read_dir(&full_path)) - }) - .await; + let full_path_to_read_dir = full_path.clone(); + let result = with_retry(move || fs::read_dir(&full_path_to_read_dir)); Ok(match result { Ok(res) => DirectoryContentVc::new( res.filter_map(|e| -> Option> { @@ -455,40 +480,70 @@ impl FileSystem for DiskFileSystem { .with_context(|| format!("reading old content of {}", full_path.display()))?; if *content != *old_content { let create_directory = *old_content == FileContent::NotFound; - self.execute(move || match &*content { - FileContent::Content(file) => { - if create_directory { - if let Some(parent) = full_path.parent() { - with_retry(move || fs::create_dir_all(parent)).with_context(|| { - format!( - "failed to create directory {} for write to {}", - parent.display(), - full_path.display() + async { + match &*content { + FileContent::Content(file) => { + if create_directory { + if let Some(parent) = full_path.parent() { + FutureRetry::new( + move || tokio::fs::create_dir_all(parent), + FutureRetryHandle::default(), ) - })?; + .await + .map_err(|(err, _attempts)| err) + .with_context(|| { + format!( + "failed to create directory {} for write to {}", + parent.display(), + full_path.display() + ) + })?; + } } + // println!("write {} bytes to {}", buffer.len(), full_path.display()); + let full_path_to_write = full_path.clone(); + FutureRetry::new( + move || { + let full_path = full_path_to_write.clone(); + async move { + let full_path = full_path.clone(); + let mut f = tokio::fs::File::create(&full_path).await?; + f.write_all(&file.content).await?; + #[cfg(target_family = "unix")] + fs::set_permissions(&full_path, file.meta.permissions.into())?; + Ok::<(), io::Error>(()) + } + }, + FutureRetryHandle::default(), + ) + .await + .map_err(|(err, _attempts)| err) + .with_context(|| format!("failed to write to {}", full_path.display())) + } + FileContent::NotFound => { + // println!("remove {}", full_path.display()); + let full_path_to_remove = full_path.clone(); + FutureRetry::new( + move || { + let full_path = full_path_to_remove.clone(); + async move { + tokio::fs::remove_file(full_path).await.or_else(|err| { + if err.kind() == ErrorKind::NotFound { + Ok(()) + } else { + Err(err) + } + }) + } + }, + FutureRetryHandle::default(), + ) + .await + .map_err(|(err, _attempts)| err) + .context(anyhow!("removing {} failed", full_path.display())) } - // println!("write {} bytes to {}", buffer.len(), full_path.display()); - with_retry(|| { - let mut f = fs::File::create(&full_path)?; - f.write_all(&file.content)?; - #[cfg(target_family = "unix")] - fs::set_permissions(&full_path, file.meta.permissions.into())?; - Ok(()) - }) - .with_context(|| format!("failed to write to {}", full_path.display())) - } - FileContent::NotFound => { - // println!("remove {}", full_path.display()); - with_retry(|| fs::remove_file(&full_path)).or_else(|err| { - if err.kind() == ErrorKind::NotFound { - Ok(()) - } else { - Err(err).context(anyhow!("removing {} failed", full_path.display())) - } - }) } - }) + } .await?; } Ok(CompletionVc::new()) @@ -962,6 +1017,28 @@ impl File { } } + pub async fn from_path_async(p: PathBuf) -> Result { + #[cfg(unix)] + { + let mut f = tokio::fs::File::open(p).await?; + let metadata = f.metadata().await?; + let mut output = Vec::with_capacity(metadata.len() as usize); + f.read_to_end(&mut output).await?; + Ok(File { + meta: metadata.into(), + content: output, + }) + } + #[cfg(not(unix))] + { + let output = tokio::fs::read(p).await?; + Ok(File { + meta: Default::default(), + content: output, + }) + } + } + pub fn from_source(str: String) -> Self { File { meta: Default::default(), diff --git a/crates/turbo-tasks/src/backend.rs b/crates/turbo-tasks/src/backend.rs index e9c5402e449b9..541af328273cd 100644 --- a/crates/turbo-tasks/src/backend.rs +++ b/crates/turbo-tasks/src/backend.rs @@ -326,7 +326,7 @@ impl PersistentTaskType { inputs: Vec, turbo_tasks: Arc, ) -> Result { - let mut resolved_inputs = Vec::new(); + let mut resolved_inputs = Vec::with_capacity(inputs.len()); for input in inputs.into_iter() { resolved_inputs.push(input.resolve().await?) } @@ -339,7 +339,7 @@ impl PersistentTaskType { inputs: Vec, turbo_tasks: Arc, ) -> Result { - let mut resolved_inputs = Vec::new(); + let mut resolved_inputs = Vec::with_capacity(inputs.len()); let mut iter = inputs.into_iter(); if let Some(this) = iter.next() { let this = this.resolve().await?;