From bad517d4fa05a7721772af9884f9015cc1212cd2 Mon Sep 17 00:00:00 2001 From: Kyle Lacy Date: Tue, 24 Sep 2024 21:39:45 -0700 Subject: [PATCH] Update unarchive to use `tar` crate instead of `tokio_tar` --- Cargo.lock | 41 ++--- crates/brioche-core/Cargo.toml | 2 +- crates/brioche-core/src/bake/unarchive.rs | 201 ++++++++++++++-------- crates/brioche-core/src/blob.rs | 68 +++++++- 4 files changed, 207 insertions(+), 105 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 14e6634..9579cbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -883,11 +883,11 @@ dependencies = [ "sqlx", "strum 0.26.3", "superconsole", + "tar", "termwiz 0.22.0", "thiserror", "tick-encoding", "tokio", - "tokio-tar", "tokio-util", "toml", "tower-lsp", @@ -4654,15 +4654,6 @@ dependencies = [ "bitflags 1.3.2", ] -[[package]] -name = "redox_syscall" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" -dependencies = [ - "bitflags 1.3.2", -] - [[package]] name = "redox_syscall" version = "0.4.1" @@ -6285,6 +6276,17 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" +[[package]] +name = "tar" +version = "0.4.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ff6c40d3aedb5e06b57c6f669ad17ab063dd1e63d977c6a88e7f4dfa4f04020" +dependencies = [ + "filetime", + "libc", + "xattr", +] + [[package]] name = "tempdir" version = "0.3.7" @@ -6593,21 +6595,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-tar" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d5714c010ca3e5c27114c1cdeb9d14641ace49874aa5626d7149e47aedace75" -dependencies = [ - "filetime", - "futures-core", - "libc", - "redox_syscall 0.3.5", - "tokio", - "tokio-stream", - "xattr", -] - [[package]] name = "tokio-util" version = "0.7.10" @@ -7599,9 +7586,9 @@ dependencies = [ [[package]] name = "xattr" -version = "1.1.3" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7dae5072fe1f8db8f8d29059189ac175196e410e40ba42d5d4684ae2f750995" +checksum = "8da84f1a25939b27f6820d92aed108f83ff920fdf11a7b19366c27c4cda81d4f" dependencies = [ "libc", "linux-raw-sys", diff --git a/crates/brioche-core/Cargo.toml b/crates/brioche-core/Cargo.toml index c849485..c35bc54 100644 --- a/crates/brioche-core/Cargo.toml +++ b/crates/brioche-core/Cargo.toml @@ -52,7 +52,6 @@ termwiz = "0.22.0" thiserror = "1.0.51" tick-encoding = "0.1.2" tokio = { version = "1.35.0", features = ["full", "tracing"] } -tokio-tar = "0.3.1" tokio-util = { version = "0.7.10", features = ["compat", "full"] } toml = "0.8.8" tower-lsp = "0.20.0" @@ -66,6 +65,7 @@ walkdir = "2.5.0" petgraph = "0.6.5" wax = { version = "0.6.0", default-features = false } gix = { version = "0.66.0", features = ["blocking-network-client", "blocking-http-transport-reqwest"] } +tar = "0.4.42" [dev-dependencies] assert_matches = "1.5.0" diff --git a/crates/brioche-core/src/bake/unarchive.rs b/crates/brioche-core/src/bake/unarchive.rs index e1e6d8b..b173729 100644 --- a/crates/brioche-core/src/bake/unarchive.rs +++ b/crates/brioche-core/src/bake/unarchive.rs @@ -2,10 +2,9 @@ use std::{collections::BTreeMap, sync::Arc}; use anyhow::Context as _; use bstr::BString; -use futures::TryStreamExt as _; -use tracing::Instrument; use crate::{ + blob::BlobHash, recipe::{Artifact, Directory, File, Meta, Unarchive, WithMeta}, Brioche, }; @@ -40,84 +39,121 @@ pub async fn bake_unarchive( let decompressed_archive_file = unarchive.compression.decompress(archive_file); - let mut archive = tokio_tar::Archive::new(decompressed_archive_file); - let mut archive_entries = archive.entries()?; - let mut directory_entries = BTreeMap::>::new(); - let mut buffer = Vec::new(); - - let save_blobs_future = async { - while let Some(archive_entry) = archive_entries.try_next().await? { - let entry_path = bstr::BString::new(archive_entry.path_bytes().into_owned()); - let entry_mode = archive_entry.header().mode()?; - - let position = archive_entry.raw_file_position(); - let estimated_progress = position as f64 / (uncompressed_archive_size as f64).max(1.0); - let progress_percent = (estimated_progress * 100.0).min(99.0) as u8; - brioche.reporter.update_job( - job_id, - crate::reporter::UpdateJob::Unarchive { progress_percent }, - ); + let (entry_tx, mut entry_rx) = tokio::sync::mpsc::channel(16); + + let mut permit = crate::blob::get_save_blob_permit().await?; + let process_archive_task = tokio::task::spawn_blocking({ + let brioche = brioche.clone(); + move || { + let decompressed_archive_file = + tokio_util::io::SyncIoBridge::new(decompressed_archive_file); + let mut archive = tar::Archive::new(decompressed_archive_file); + + let mut buffer = Vec::new(); + + for archive_entry in archive.entries()? { + let archive_entry = archive_entry?; + let entry_path = bstr::BString::new(archive_entry.path_bytes().into_owned()); + let entry_mode = archive_entry.header().mode()?; + + let position = archive_entry.raw_file_position(); + let estimated_progress = + position as f64 / (uncompressed_archive_size as f64).max(1.0); + let progress_percent = (estimated_progress * 100.0).min(99.0) as u8; + brioche.reporter.update_job( + job_id, + crate::reporter::UpdateJob::Unarchive { progress_percent }, + ); + + let entry = match archive_entry.header().entry_type() { + tar::EntryType::Regular => { + let entry_blob_hash = crate::blob::save_blob_from_reader_sync( + &brioche, + &mut permit, + archive_entry, + crate::blob::SaveBlobOptions::new(), + &mut buffer, + )?; + let executable = entry_mode & 0o100 != 0; + + ArchiveEntry::File { + content_blob: entry_blob_hash, + executable, + } + } + tar::EntryType::Symlink => { + let link_name = archive_entry.link_name_bytes().with_context(|| { + format!( + "unsupported tar archive: no link name for symlink entry at {}", + entry_path + ) + })?; - let entry_artifact = match archive_entry.header().entry_type() { - tokio_tar::EntryType::Regular => { - let mut permit = crate::blob::get_save_blob_permit().await?; - let entry_blob_hash = crate::blob::save_blob_from_reader( - brioche, - &mut permit, - archive_entry, - crate::blob::SaveBlobOptions::new(), - &mut buffer, - ) - .await?; - let executable = entry_mode & 0o100 != 0; - - Some(Artifact::File(File { - content_blob: entry_blob_hash, - executable, - resources: Directory::default(), - })) - } - tokio_tar::EntryType::Symlink => { - let link_name = archive_entry.link_name_bytes().with_context(|| { - format!( - "unsupported tar archive: no link name for symlink entry at {}", + ArchiveEntry::Symlink { + target: link_name.into_owned().into(), + } + } + tar::EntryType::Link => { + let link_name = archive_entry.link_name_bytes().with_context(|| { + format!( + "unsupported tar archive: no link name for hardlink entry at {}", + entry_path + ) + })?; + + ArchiveEntry::Link { + link_name: link_name.into_owned().into(), + } + } + tar::EntryType::Directory => ArchiveEntry::Directory, + tar::EntryType::XGlobalHeader | tar::EntryType::XHeader => { + // Ignore + continue; + } + other => { + anyhow::bail!( + "unsupported tar archive: unsupported entry type {:?} at {}", + other, entry_path - ) - })?; + ); + } + }; - Some(Artifact::Symlink { - target: link_name.into_owned().into(), - }) - } - tokio_tar::EntryType::Link => { - let link_name = archive_entry.link_name_bytes().with_context(|| { + entry_tx.blocking_send((entry_path, entry))?; + } + + anyhow::Ok(()) + } + }); + let process_archive_task = async { + process_archive_task.await??; + anyhow::Result::<()>::Ok(()) + }; + + let build_directory_fut = async { + let mut directory_entries = BTreeMap::>::new(); + + while let Some((entry_path, entry)) = entry_rx.recv().await { + let entry_artifact = match entry { + ArchiveEntry::File { + content_blob, + executable, + } => Some(Artifact::File(File { + content_blob, + executable, + resources: Directory::default(), + })), + ArchiveEntry::Symlink { target } => Some(Artifact::Symlink { target }), + ArchiveEntry::Link { link_name } => { + let linked_entry = directory_entries.get(&link_name).with_context(|| { format!( - "unsupported tar archive: no link name for hardlink entry at {}", - entry_path - ) - })?; - let linked_entry = - directory_entries.get(link_name.as_ref()).with_context(|| { - format!( "unsupported tar archive: could not find target for link entry at {}", entry_path ) - })?; - + })?; Some(linked_entry.value.clone()) } - tokio_tar::EntryType::Directory => Some(Artifact::Directory(Directory::default())), - tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => { - // Ignore - None - } - other => { - anyhow::bail!( - "unsupported tar archive: unsupported entry type {:?} at {}", - other, - entry_path - ); - } + ArchiveEntry::Directory => Some(Artifact::Directory(Directory::default())), }; let entry_path = crate::fs_utils::logical_path_bytes(&entry_path); @@ -144,13 +180,26 @@ pub async fn bake_unarchive( }, ); - anyhow::Ok(()) - } - .instrument(tracing::info_span!("save_blobs")); + let directory = Directory::create(brioche, &directory_entries).await?; - save_blobs_future.await?; + anyhow::Ok(directory) + }; - let directory = Directory::create(brioche, &directory_entries).await?; + let (_, directory) = tokio::try_join!(process_archive_task, build_directory_fut)?; Ok(directory) } + +enum ArchiveEntry { + File { + content_blob: BlobHash, + executable: bool, + }, + Symlink { + target: BString, + }, + Link { + link_name: BString, + }, + Directory, +} diff --git a/crates/brioche-core/src/blob.rs b/crates/brioche-core/src/blob.rs index 978cc24..cbf3835 100644 --- a/crates/brioche-core/src/blob.rs +++ b/crates/brioche-core/src/blob.rs @@ -1,5 +1,5 @@ use std::{ - io::Read as _, + io::{Read as _, Write as _}, os::unix::prelude::PermissionsExt as _, path::{Path, PathBuf}, }; @@ -226,6 +226,72 @@ where Ok(blob_hash) } +#[tracing::instrument(skip_all)] +pub fn save_blob_from_reader_sync<'a, R>( + brioche: &Brioche, + _permit: &mut SaveBlobPermit<'_>, + mut input: R, + mut options: SaveBlobOptions<'a>, + buffer: &mut Vec, +) -> anyhow::Result +where + R: std::io::Read, +{ + anyhow::ensure!(!options.remove_input, "cannot remove input from reader"); + anyhow::ensure!( + options.expected_hash.is_none(), + "cannot validate expected hash in sync mode" + ); + + let mut hasher = blake3::Hasher::new(); + + let temp_dir = brioche.home.join("blobs-temp"); + std::fs::create_dir_all(&temp_dir).unwrap(); + let temp_path = temp_dir.join(ulid::Ulid::new().to_string()); + let mut temp_file = std::fs::File::create(&temp_path).context("failed to open temp file")?; + + tracing::trace!(temp_path = %temp_path.display(), "saving blob"); + + buffer.resize(1024 * 1024, 0); + let mut total_bytes_read = 0; + loop { + let length = input.read(buffer).context("failed to read")?; + if length == 0 { + break; + } + + total_bytes_read += length; + let buffer = &buffer[..length]; + + temp_file.write_all(buffer).context("failed to write all")?; + + hasher.update(buffer); + + if let Some(on_progress) = &mut options.on_progress { + on_progress(total_bytes_read)?; + } + } + + let hash = hasher.finalize(); + let blob_hash = BlobHash(hash); + let blob_path = local_blob_path(brioche, blob_hash); + + if let Some(parent) = blob_path.parent() { + std::fs::create_dir_all(parent)?; + } + + tracing::debug!(overwrite = blob_path.exists(), %blob_hash, "saved blob"); + + temp_file + .set_permissions(blob_permissions()) + .context("failed to set blob permissions")?; + temp_file.set_modified(crate::fs_utils::brioche_epoch())?; + + std::fs::rename(&temp_path, &blob_path).context("failed to rename blob from temp file")?; + + Ok(blob_hash) +} + #[tracing::instrument(skip(brioche, _permit, options), err)] pub async fn save_blob_from_file<'a>( brioche: &Brioche,