From 0959ac8a999b985701db9c4022d742dd2b2f13d7 Mon Sep 17 00:00:00 2001 From: Charlie Marsh Date: Wed, 13 Mar 2024 12:53:03 -0400 Subject: [PATCH] Add backoff for transient Windows failures --- Cargo.lock | 15 +++++++ Cargo.toml | 1 + crates/uv-cache/src/lib.rs | 4 +- .../src/distribution_database.rs | 2 + crates/uv-fs/Cargo.toml | 3 +- crates/uv-fs/src/lib.rs | 41 +++++++++++++++++++ crates/uv-installer/src/downloader.rs | 20 ++++++--- 7 files changed, 77 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 410cd7534d54..a58cb2a6e73f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -265,6 +265,20 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "futures-core", + "getrandom", + "instant", + "pin-project-lite", + "rand", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -4386,6 +4400,7 @@ dependencies = [ name = "uv-fs" version = "0.0.1" dependencies = [ + "backoff", "dunce", "encoding_rs_io", "fs-err", diff --git a/Cargo.toml b/Cargo.toml index c738478769c8..316ea17d3457 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ async-trait = { version = "0.1.77" } async-recursion = { version = "1.0.5" } async_http_range_reader = { version = "0.7.0" } async_zip = { git = "https://github.com/charliermarsh/rs-async-zip", rev = "d76801da0943de985254fc6255c0e476b57c5836", features = ["deflate"] } +backoff = { version = "0.4.0" } base64 = { version = "0.21.7" } cachedir = { version = "0.3.1" } cargo-util = { version = "0.2.8" } diff --git a/crates/uv-cache/src/lib.rs b/crates/uv-cache/src/lib.rs index 592f88c22c20..c435dc5fc5fd 100644 --- a/crates/uv-cache/src/lib.rs +++ b/crates/uv-cache/src/lib.rs @@ -206,7 +206,7 @@ impl Cache { } /// Persist a temporary directory to the artifact store. - pub fn persist( + pub async fn persist( &self, temp_dir: impl AsRef, path: impl AsRef, @@ -218,7 +218,7 @@ impl Cache { // Move the temporary directory into the directory store. let archive_entry = self.entry(CacheBucket::Archive, "", id); fs_err::create_dir_all(archive_entry.dir())?; - fs_err::rename(temp_dir.as_ref(), archive_entry.path())?; + uv_fs::rename_with_retry(temp_dir.as_ref(), archive_entry.path()).await?; // Create a symlink to the directory store. fs_err::create_dir_all(path.as_ref().parent().expect("Cache entry to have parent"))?; diff --git a/crates/uv-distribution/src/distribution_database.rs b/crates/uv-distribution/src/distribution_database.rs index 91b2eb95b9ad..c9882b574c91 100644 --- a/crates/uv-distribution/src/distribution_database.rs +++ b/crates/uv-distribution/src/distribution_database.rs @@ -451,6 +451,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> let archive = self .cache .persist(temp_dir.into_path(), wheel_entry.path()) + .await .map_err(Error::CacheRead)?; Ok(archive) } @@ -532,6 +533,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> let archive = self .cache .persist(temp_dir.into_path(), wheel_entry.path()) + .await .map_err(Error::CacheRead)?; Ok(archive) } diff --git a/crates/uv-fs/Cargo.toml b/crates/uv-fs/Cargo.toml index e76d99805e24..cca796721322 100644 --- a/crates/uv-fs/Cargo.toml +++ b/crates/uv-fs/Cargo.toml @@ -15,6 +15,7 @@ workspace = true [dependencies] uv-warnings = { path = "../uv-warnings" } +backoff = { workspace = true } dunce = { workspace = true } encoding_rs_io = { workspace = true } fs-err = { workspace = true } @@ -27,4 +28,4 @@ urlencoding = { workspace = true } [features] default = [] -tokio = ["fs-err/tokio", "dep:tokio"] +tokio = ["dep:tokio", "fs-err/tokio", "backoff/tokio"] diff --git a/crates/uv-fs/src/lib.rs b/crates/uv-fs/src/lib.rs index 195286219a8f..008c13cfe5b5 100644 --- a/crates/uv-fs/src/lib.rs +++ b/crates/uv-fs/src/lib.rs @@ -162,6 +162,47 @@ pub fn force_remove_all(path: impl AsRef) -> Result Ok(true) } +/// Rename a file, retrying (on Windows) if it fails due to transient operating system errors. +#[cfg(feature = "tokio")] +pub async fn rename_with_retry( + from: impl AsRef, + to: impl AsRef, +) -> Result<(), std::io::Error> { + if cfg!(windows) { + // On Windows, antivirus software can lock files temporarily, making them inaccessible. + // This is most common for DLLs, and the common suggestion is to retry the operation with + // some backoff. + // + // See: + let from = from.as_ref(); + let to = to.as_ref(); + + let backoff = backoff::ExponentialBackoffBuilder::default() + .with_initial_interval(std::time::Duration::from_millis(10)) + .with_max_elapsed_time(Some(std::time::Duration::from_secs(10))) + .build(); + + backoff::future::retry(backoff, || async move { + match fs_err::rename(from, to) { + Ok(()) => Ok(()), + Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => { + warn!( + "Retrying rename from {} to {} due to transient error: {}", + from.display(), + to.display(), + err + ); + Err(backoff::Error::transient(err)) + } + Err(err) => Err(backoff::Error::permanent(err)), + } + }) + .await + } else { + fs_err::tokio::rename(from, to).await + } +} + /// Iterate over the subdirectories of a directory. /// /// If the directory does not exist, returns an empty iterator. diff --git a/crates/uv-installer/src/downloader.rs b/crates/uv-installer/src/downloader.rs index 9c7b54751d71..61d6c79bfee1 100644 --- a/crates/uv-installer/src/downloader.rs +++ b/crates/uv-installer/src/downloader.rs @@ -1,8 +1,9 @@ use std::cmp::Reverse; -use std::path::{Path, PathBuf}; +use std::path::Path; use std::sync::Arc; use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; +use tempfile::TempDir; use tokio::task::JoinError; use tracing::instrument; use url::Url; @@ -27,6 +28,8 @@ pub enum Error { Join(#[from] JoinError), #[error(transparent)] Editable(#[from] uv_distribution::Error), + #[error("Failed to write to the client cache")] + CacheWrite(#[source] std::io::Error), #[error("Unzip failed in another thread: {0}")] Thread(String), } @@ -197,21 +200,26 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> { } // Unzip the wheel. - let archive = tokio::task::spawn_blocking({ + let temp_dir = tokio::task::spawn_blocking({ let download = download.clone(); let cache = self.cache.clone(); - move || -> Result { + move || -> Result { // Unzip the wheel into a temporary directory. let temp_dir = tempfile::tempdir_in(cache.root())?; download.unzip(temp_dir.path())?; - - // Persist the temporary directory to the directory store. - Ok(cache.persist(temp_dir.into_path(), download.target())?) + Ok(temp_dir) } }) .await? .map_err(|err| Error::Unzip(download.remote().clone(), err))?; + // Persist the temporary directory to the directory store. + let archive = self + .cache + .persist(temp_dir.into_path(), download.target()) + .map_err(Error::CacheWrite) + .await?; + Ok(download.into_cached_dist(archive)) } }