From 9a8b56e468858ed0518939cd23d44361f69a3e86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=A9my=20Audiger?= Date: Wed, 28 Aug 2024 08:56:40 +0200 Subject: [PATCH] refactor: move from tokio-tar to tar-rs library to fix an issue when peeling archive. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérémy Audiger --- Cargo.lock | 54 +++++++---- crates/brioche-core/Cargo.toml | 7 +- crates/brioche-core/src/bake/unarchive.rs | 32 ++++--- crates/brioche-core/src/blob.rs | 109 ++++++++++++++++++++++ crates/brioche-core/src/download.rs | 2 +- 5 files changed, 171 insertions(+), 33 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ef92078..ce21efd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -876,6 +876,7 @@ dependencies = [ "sqlx", "strum 0.26.3", "superconsole", + "tar", "termwiz 0.22.0", "thiserror", "tick-encoding", @@ -1289,7 +1290,7 @@ dependencies = [ "bitflags 1.3.2", "crossterm_winapi", "libc", - "mio", + "mio 0.8.10", "parking_lot 0.12.3", "signal-hook 0.3.17", "signal-hook-mio", @@ -2115,9 +2116,9 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hermit-abi" -version = "0.3.3" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" [[package]] name = "hex" @@ -2747,6 +2748,18 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mio" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" +dependencies = [ + "hermit-abi", + "libc", + "wasi", + "windows-sys 0.52.0", +] + [[package]] name = "mockito" version = "1.2.0" @@ -4339,7 +4352,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af" dependencies = [ "libc", - "mio", + "mio 0.8.10", "signal-hook 0.3.17", ] @@ -5210,6 +5223,17 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" +[[package]] +name = "tar" +version = "0.4.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb797dad5fb5b76fcf519e702f4a589483b5ef06567f160c392832c1f5e44909" +dependencies = [ + "filetime", + "libc", + "xattr", +] + [[package]] name = "tempdir" version = "0.3.7" @@ -5467,29 +5491,28 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.35.0" +version = "1.39.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "841d45b238a16291a4e1584e61820b8ae57d696cc5015c459c229ccc6990cc1c" +checksum = "9babc99b9923bfa4804bd74722ff02c0381021eafa4db9949217e3be8e84fff5" dependencies = [ "backtrace", "bytes", "libc", - "mio", - "num_cpus", + "mio 1.0.2", "parking_lot 0.12.3", "pin-project-lite", "signal-hook-registry", "socket2 0.5.5", "tokio-macros", "tracing", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] name = "tokio-macros" -version = "2.2.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", @@ -5535,9 +5558,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.10" +version = "0.7.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" +checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" dependencies = [ "bytes", "futures-core", @@ -5548,7 +5571,6 @@ dependencies = [ "pin-project-lite", "slab", "tokio", - "tracing", ] [[package]] @@ -6470,9 +6492,9 @@ dependencies = [ [[package]] name = "xattr" -version = "1.1.3" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7dae5072fe1f8db8f8d29059189ac175196e410e40ba42d5d4684ae2f750995" +checksum = "8da84f1a25939b27f6820d92aed108f83ff920fdf11a7b19366c27c4cda81d4f" dependencies = [ "libc", "linux-raw-sys", diff --git a/crates/brioche-core/Cargo.toml b/crates/brioche-core/Cargo.toml index 874dffc..23d2246 100644 --- a/crates/brioche-core/Cargo.toml +++ b/crates/brioche-core/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" [dependencies] anyhow = { version = "1.0.75", features = ["backtrace"] } -async-compression = { version = "0.4.5", features = ["tokio", "bzip2", "gzip", "xz", "zstd"] } +async-compression = { version = "0.4.5", features = ["bzip2", "gzip", "xz", "zstd"] } async-recursion = "1.0.5" biome_formatter = "0.5.7" biome_js_formatter = "0.5.7" @@ -48,12 +48,13 @@ sha2 = "0.10.8" sqlx = { version = "0.7.3", features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate", "json"] } strum = { version = "0.26.3", features = ["derive"] } superconsole = "0.2.0" +tar = "0.4.41" termwiz = "0.22.0" thiserror = "1.0.51" tick-encoding = "0.1.2" -tokio = { version = "1.35.0", features = ["full", "tracing"] } +tokio = { version = "1.39.3", features = ["full", "tracing"] } tokio-tar = "0.3.1" -tokio-util = { version = "0.7.10", features = ["compat", "full"] } +tokio-util = { version = "0.7.11", features = ["compat", "full"] } toml = "0.8.8" tower-lsp = "0.20.0" tracing = "0.1.40" diff --git a/crates/brioche-core/src/bake/unarchive.rs b/crates/brioche-core/src/bake/unarchive.rs index 244f3db..3ab3c88 100644 --- a/crates/brioche-core/src/bake/unarchive.rs +++ b/crates/brioche-core/src/bake/unarchive.rs @@ -2,7 +2,7 @@ use std::{collections::BTreeMap, sync::Arc}; use anyhow::Context as _; use bstr::BString; -use futures::TryStreamExt as _; +use tokio::io::AsyncReadExt; use tracing::Instrument; use crate::{ @@ -38,15 +38,21 @@ pub async fn bake_unarchive( let uncompressed_archive_size = archive_file.metadata().await?.len(); let archive_file = tokio::io::BufReader::new(archive_file); - let decompressed_archive_file = unarchive.compression.decompress(archive_file); + let mut decompressed_archive_file = unarchive.compression.decompress(archive_file); + let mut decompressed_archive_buffer = Vec::new(); + decompressed_archive_file + .read(&mut decompressed_archive_buffer) + .await?; - let mut archive = tokio_tar::Archive::new(decompressed_archive_file); - let mut archive_entries = archive.entries()?; + let archive = std::sync::Mutex::new(std::sync::Arc::new(tar::Archive::new( + std::io::Cursor::new(decompressed_archive_buffer), + ))); + let mut archive_entries = archive.lock()?.entries()?; let mut directory_entries = BTreeMap::>::new(); let mut buffer = Vec::new(); let save_blobs_future = async { - while let Some(archive_entry) = archive_entries.try_next().await? { + while let Some(archive_entry) = archive_entries.next().transpose()? { let entry_path = bstr::BString::new(archive_entry.path_bytes().into_owned()); let entry_mode = archive_entry.header().mode()?; @@ -59,7 +65,7 @@ pub async fn bake_unarchive( ); let entry_artifact = match archive_entry.header().entry_type() { - tokio_tar::EntryType::Regular => { + tar::EntryType::Regular => { let mut permit = crate::blob::get_save_blob_permit().await?; let entry_blob_hash = crate::blob::save_blob_from_reader( brioche, @@ -77,7 +83,7 @@ pub async fn bake_unarchive( resources: Directory::default(), })) } - tokio_tar::EntryType::Symlink => { + tar::EntryType::Symlink => { let link_name = archive_entry.link_name_bytes().with_context(|| { format!( "unsupported tar archive: no link name for symlink entry at {}", @@ -89,7 +95,7 @@ pub async fn bake_unarchive( target: link_name.into_owned().into(), }) } - tokio_tar::EntryType::Link => { + tar::EntryType::Link => { let link_name = archive_entry.link_name_bytes().with_context(|| { format!( "unsupported tar archive: no link name for hardlink entry at {}", @@ -99,15 +105,15 @@ pub async fn bake_unarchive( let linked_entry = directory_entries.get(link_name.as_ref()).with_context(|| { format!( - "unsupported tar archive: could not find target for link entry at {}", - entry_path - ) + "unsupported tar archive: could not find target for link entry at {}", + entry_path + ) })?; Some(linked_entry.value.clone()) } - tokio_tar::EntryType::Directory => Some(Artifact::Directory(Directory::default())), - tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => { + tar::EntryType::Directory => Some(Artifact::Directory(Directory::default())), + tar::EntryType::XGlobalHeader | tar::EntryType::XHeader => { // Ignore None } diff --git a/crates/brioche-core/src/blob.rs b/crates/brioche-core/src/blob.rs index 333224d..873b408 100644 --- a/crates/brioche-core/src/blob.rs +++ b/crates/brioche-core/src/blob.rs @@ -124,6 +124,115 @@ pub async fn save_blob_from_reader<'a, R>( mut options: SaveBlobOptions<'a>, buffer: &mut Vec, ) -> anyhow::Result +where + R: std::io::Read + Unpin, +{ + anyhow::ensure!(!options.remove_input, "cannot remove input from reader"); + + let mut hasher = blake3::Hasher::new(); + let mut validation_hashing = options + .expected_hash + .as_ref() + .map(|validate_hash| (validate_hash, super::Hasher::for_hash(validate_hash))); + + let temp_dir = brioche.home.join("blobs-temp"); + tokio::fs::create_dir_all(&temp_dir).await.unwrap(); + let temp_path = temp_dir.join(ulid::Ulid::new().to_string()); + let mut temp_file = tokio::fs::File::create(&temp_path) + .await + .context("failed to open temp file")?; + + tracing::trace!(temp_path = %temp_path.display(), "saving blob"); + + let mut buffer = vec![0u8; 1024 * 1024]; + let mut total_bytes_read = 0; + loop { + let length = input.read(&mut buffer).context("failed to read")?; + if length == 0 { + break; + } + + total_bytes_read += length; + let buffer = &buffer[..length]; + + temp_file + .write_all(buffer) + .await + .context("failed to write all")?; + + hasher.update(buffer); + + if let Some((_, validate_hasher)) = &mut validation_hashing { + validate_hasher.update(buffer); + } + + if let Some(on_progress) = &mut options.on_progress { + on_progress(total_bytes_read)?; + } + } + + let hash = hasher.finalize(); + let blob_hash = BlobHash(hash); + let blob_path = local_blob_path(brioche, blob_hash); + + if let Some((expected_hash, validate_hasher)) = validation_hashing { + let actual_hash = validate_hasher.finish()?; + + if *expected_hash != actual_hash { + anyhow::bail!("expected hash {} but got {}", expected_hash, actual_hash); + } + + let expected_hash_string = expected_hash.to_string(); + let blob_hash_string = blob_hash.to_string(); + + let mut db_conn = brioche.db_conn.lock().await; + let mut db_transaction = db_conn.begin().await?; + sqlx::query!( + r" + INSERT INTO blob_aliases (hash, blob_hash) VALUES (?, ?) + ON CONFLICT (hash) DO UPDATE SET blob_hash = ? + ", + expected_hash_string, + blob_hash_string, + blob_hash_string, + ) + .execute(&mut *db_transaction) + .await?; + db_transaction.commit().await?; + drop(db_conn); + } + + if let Some(parent) = blob_path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + + tracing::debug!(overwrite = blob_path.exists(), %blob_hash, "saved blob"); + + temp_file + .set_permissions(blob_permissions()) + .await + .context("failed to set blob permissions")?; + let temp_file = temp_file.into_std().await; + tokio::task::spawn_blocking(move || { + temp_file.set_modified(crate::fs_utils::brioche_epoch())?; + anyhow::Ok(()) + }) + .await??; + + tokio::fs::rename(&temp_path, &blob_path) + .await + .context("failed to rename blob from temp file")?; + + Ok(blob_hash) +} + +#[tracing::instrument(skip_all)] +pub async fn save_blob_from_async_reader<'a, R>( + brioche: &Brioche, + _permit: SaveBlobPermit<'_>, + mut input: R, + mut options: SaveBlobOptions<'a>, +) -> anyhow::Result where R: tokio::io::AsyncRead + Unpin, { diff --git a/crates/brioche-core/src/download.rs b/crates/brioche-core/src/download.rs index 5795214..1f57a11 100644 --- a/crates/brioche-core/src/download.rs +++ b/crates/brioche-core/src/download.rs @@ -61,7 +61,7 @@ pub async fn download( Ok(()) }); - let blob_hash = crate::blob::save_blob_from_reader( + let blob_hash = crate::blob::save_blob_from_async_reader( brioche, &mut save_blob_permit, download_stream,