Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move from tokio-tar to tar-rs library to fix an issue when peeling archive. #117

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 38 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions crates/brioche-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ edition = "2021"

[dependencies]
anyhow = { version = "1.0.75", features = ["backtrace"] }
async-compression = { version = "0.4.5", features = ["tokio", "bzip2", "gzip", "xz", "zstd"] }
async-compression = { version = "0.4.5", features = ["bzip2", "gzip", "xz", "zstd"] }
async-recursion = "1.0.5"
biome_formatter = "0.5.7"
biome_js_formatter = "0.5.7"
Expand Down Expand Up @@ -48,12 +48,13 @@ sha2 = "0.10.8"
sqlx = { version = "0.7.3", features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate", "json"] }
strum = { version = "0.26.3", features = ["derive"] }
superconsole = "0.2.0"
tar = "0.4.41"
termwiz = "0.22.0"
thiserror = "1.0.51"
tick-encoding = "0.1.2"
tokio = { version = "1.35.0", features = ["full", "tracing"] }
tokio = { version = "1.39.3", features = ["full", "tracing"] }
tokio-tar = "0.3.1"
tokio-util = { version = "0.7.10", features = ["compat", "full"] }
tokio-util = { version = "0.7.11", features = ["compat", "full"] }
toml = "0.8.8"
tower-lsp = "0.20.0"
tracing = "0.1.40"
Expand Down
32 changes: 19 additions & 13 deletions crates/brioche-core/src/bake/unarchive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{collections::BTreeMap, sync::Arc};

use anyhow::Context as _;
use bstr::BString;
use futures::TryStreamExt as _;
use tokio::io::AsyncReadExt;
use tracing::Instrument;

use crate::{
Expand Down Expand Up @@ -38,15 +38,21 @@ pub async fn bake_unarchive(
let uncompressed_archive_size = archive_file.metadata().await?.len();
let archive_file = tokio::io::BufReader::new(archive_file);

let decompressed_archive_file = unarchive.compression.decompress(archive_file);
let mut decompressed_archive_file = unarchive.compression.decompress(archive_file);
let mut decompressed_archive_buffer = Vec::new();
decompressed_archive_file
.read(&mut decompressed_archive_buffer)
.await?;

let mut archive = tokio_tar::Archive::new(decompressed_archive_file);
let mut archive_entries = archive.entries()?;
let archive = std::sync::Mutex::new(std::sync::Arc::new(tar::Archive::new(
std::io::Cursor::new(decompressed_archive_buffer),
)));
let mut archive_entries = archive.lock()?.entries()?;
let mut directory_entries = BTreeMap::<BString, WithMeta<Artifact>>::new();
let mut buffer = Vec::new();

let save_blobs_future = async {
while let Some(archive_entry) = archive_entries.try_next().await? {
while let Some(archive_entry) = archive_entries.next().transpose()? {
let entry_path = bstr::BString::new(archive_entry.path_bytes().into_owned());
let entry_mode = archive_entry.header().mode()?;

Expand All @@ -59,7 +65,7 @@ pub async fn bake_unarchive(
);

let entry_artifact = match archive_entry.header().entry_type() {
tokio_tar::EntryType::Regular => {
tar::EntryType::Regular => {
let mut permit = crate::blob::get_save_blob_permit().await?;
let entry_blob_hash = crate::blob::save_blob_from_reader(
brioche,
Expand All @@ -77,7 +83,7 @@ pub async fn bake_unarchive(
resources: Directory::default(),
}))
}
tokio_tar::EntryType::Symlink => {
tar::EntryType::Symlink => {
let link_name = archive_entry.link_name_bytes().with_context(|| {
format!(
"unsupported tar archive: no link name for symlink entry at {}",
Expand All @@ -89,7 +95,7 @@ pub async fn bake_unarchive(
target: link_name.into_owned().into(),
})
}
tokio_tar::EntryType::Link => {
tar::EntryType::Link => {
let link_name = archive_entry.link_name_bytes().with_context(|| {
format!(
"unsupported tar archive: no link name for hardlink entry at {}",
Expand All @@ -99,15 +105,15 @@ pub async fn bake_unarchive(
let linked_entry =
directory_entries.get(link_name.as_ref()).with_context(|| {
format!(
"unsupported tar archive: could not find target for link entry at {}",
entry_path
)
"unsupported tar archive: could not find target for link entry at {}",
entry_path
)
})?;

Some(linked_entry.value.clone())
}
tokio_tar::EntryType::Directory => Some(Artifact::Directory(Directory::default())),
tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => {
tar::EntryType::Directory => Some(Artifact::Directory(Directory::default())),
tar::EntryType::XGlobalHeader | tar::EntryType::XHeader => {
// Ignore
None
}
Expand Down
109 changes: 109 additions & 0 deletions crates/brioche-core/src/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,115 @@ pub async fn save_blob_from_reader<'a, R>(
mut options: SaveBlobOptions<'a>,
buffer: &mut Vec<u8>,
) -> anyhow::Result<BlobHash>
where
R: std::io::Read + Unpin,
{
anyhow::ensure!(!options.remove_input, "cannot remove input from reader");

let mut hasher = blake3::Hasher::new();
let mut validation_hashing = options
.expected_hash
.as_ref()
.map(|validate_hash| (validate_hash, super::Hasher::for_hash(validate_hash)));

let temp_dir = brioche.home.join("blobs-temp");
tokio::fs::create_dir_all(&temp_dir).await.unwrap();
let temp_path = temp_dir.join(ulid::Ulid::new().to_string());
let mut temp_file = tokio::fs::File::create(&temp_path)
.await
.context("failed to open temp file")?;

tracing::trace!(temp_path = %temp_path.display(), "saving blob");

let mut buffer = vec![0u8; 1024 * 1024];
let mut total_bytes_read = 0;
loop {
let length = input.read(&mut buffer).context("failed to read")?;
if length == 0 {
break;
}

total_bytes_read += length;
let buffer = &buffer[..length];

temp_file
.write_all(buffer)
.await
.context("failed to write all")?;

hasher.update(buffer);

if let Some((_, validate_hasher)) = &mut validation_hashing {
validate_hasher.update(buffer);
}

if let Some(on_progress) = &mut options.on_progress {
on_progress(total_bytes_read)?;
}
}

let hash = hasher.finalize();
let blob_hash = BlobHash(hash);
let blob_path = local_blob_path(brioche, blob_hash);

if let Some((expected_hash, validate_hasher)) = validation_hashing {
let actual_hash = validate_hasher.finish()?;

if *expected_hash != actual_hash {
anyhow::bail!("expected hash {} but got {}", expected_hash, actual_hash);
}

let expected_hash_string = expected_hash.to_string();
let blob_hash_string = blob_hash.to_string();

let mut db_conn = brioche.db_conn.lock().await;
let mut db_transaction = db_conn.begin().await?;
sqlx::query!(
r"
INSERT INTO blob_aliases (hash, blob_hash) VALUES (?, ?)
ON CONFLICT (hash) DO UPDATE SET blob_hash = ?
",
expected_hash_string,
blob_hash_string,
blob_hash_string,
)
.execute(&mut *db_transaction)
.await?;
db_transaction.commit().await?;
drop(db_conn);
}

if let Some(parent) = blob_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}

tracing::debug!(overwrite = blob_path.exists(), %blob_hash, "saved blob");

temp_file
.set_permissions(blob_permissions())
.await
.context("failed to set blob permissions")?;
let temp_file = temp_file.into_std().await;
tokio::task::spawn_blocking(move || {
temp_file.set_modified(crate::fs_utils::brioche_epoch())?;
anyhow::Ok(())
})
.await??;

tokio::fs::rename(&temp_path, &blob_path)
.await
.context("failed to rename blob from temp file")?;

Ok(blob_hash)
}

#[tracing::instrument(skip_all)]
pub async fn save_blob_from_async_reader<'a, R>(
brioche: &Brioche,
_permit: SaveBlobPermit<'_>,
mut input: R,
mut options: SaveBlobOptions<'a>,
) -> anyhow::Result<BlobHash>
where
R: tokio::io::AsyncRead + Unpin,
{
Expand Down
2 changes: 1 addition & 1 deletion crates/brioche-core/src/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub async fn download(
Ok(())
});

let blob_hash = crate::blob::save_blob_from_reader(
let blob_hash = crate::blob::save_blob_from_async_reader(
brioche,
&mut save_blob_permit,
download_stream,
Expand Down
Loading