Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update unarchive to use tar crate instead of tokio_tar #127

Merged
merged 1 commit into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 14 additions & 27 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/brioche-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ termwiz = "0.22.0"
thiserror = "1.0.51"
tick-encoding = "0.1.2"
tokio = { version = "1.35.0", features = ["full", "tracing"] }
tokio-tar = "0.3.1"
tokio-util = { version = "0.7.10", features = ["compat", "full"] }
toml = "0.8.8"
tower-lsp = "0.20.0"
Expand All @@ -66,6 +65,7 @@ walkdir = "2.5.0"
petgraph = "0.6.5"
wax = { version = "0.6.0", default-features = false }
gix = { version = "0.66.0", features = ["blocking-network-client", "blocking-http-transport-reqwest"] }
tar = "0.4.42"

[dev-dependencies]
assert_matches = "1.5.0"
Expand Down
201 changes: 125 additions & 76 deletions crates/brioche-core/src/bake/unarchive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ use std::{collections::BTreeMap, sync::Arc};

use anyhow::Context as _;
use bstr::BString;
use futures::TryStreamExt as _;
use tracing::Instrument;

use crate::{
blob::BlobHash,
recipe::{Artifact, Directory, File, Meta, Unarchive, WithMeta},
Brioche,
};
Expand Down Expand Up @@ -40,84 +39,121 @@ pub async fn bake_unarchive(

let decompressed_archive_file = unarchive.compression.decompress(archive_file);

let mut archive = tokio_tar::Archive::new(decompressed_archive_file);
let mut archive_entries = archive.entries()?;
let mut directory_entries = BTreeMap::<BString, WithMeta<Artifact>>::new();
let mut buffer = Vec::new();

let save_blobs_future = async {
while let Some(archive_entry) = archive_entries.try_next().await? {
let entry_path = bstr::BString::new(archive_entry.path_bytes().into_owned());
let entry_mode = archive_entry.header().mode()?;

let position = archive_entry.raw_file_position();
let estimated_progress = position as f64 / (uncompressed_archive_size as f64).max(1.0);
let progress_percent = (estimated_progress * 100.0).min(99.0) as u8;
brioche.reporter.update_job(
job_id,
crate::reporter::UpdateJob::Unarchive { progress_percent },
);
let (entry_tx, mut entry_rx) = tokio::sync::mpsc::channel(16);

let mut permit = crate::blob::get_save_blob_permit().await?;
let process_archive_task = tokio::task::spawn_blocking({
let brioche = brioche.clone();
move || {
let decompressed_archive_file =
tokio_util::io::SyncIoBridge::new(decompressed_archive_file);
let mut archive = tar::Archive::new(decompressed_archive_file);

let mut buffer = Vec::new();

for archive_entry in archive.entries()? {
let archive_entry = archive_entry?;
let entry_path = bstr::BString::new(archive_entry.path_bytes().into_owned());
let entry_mode = archive_entry.header().mode()?;

let position = archive_entry.raw_file_position();
let estimated_progress =
position as f64 / (uncompressed_archive_size as f64).max(1.0);
let progress_percent = (estimated_progress * 100.0).min(99.0) as u8;
brioche.reporter.update_job(
job_id,
crate::reporter::UpdateJob::Unarchive { progress_percent },
);

let entry = match archive_entry.header().entry_type() {
tar::EntryType::Regular => {
let entry_blob_hash = crate::blob::save_blob_from_reader_sync(
&brioche,
&mut permit,
archive_entry,
crate::blob::SaveBlobOptions::new(),
&mut buffer,
)?;
let executable = entry_mode & 0o100 != 0;

ArchiveEntry::File {
content_blob: entry_blob_hash,
executable,
}
}
tar::EntryType::Symlink => {
let link_name = archive_entry.link_name_bytes().with_context(|| {
format!(
"unsupported tar archive: no link name for symlink entry at {}",
entry_path
)
})?;

let entry_artifact = match archive_entry.header().entry_type() {
tokio_tar::EntryType::Regular => {
let mut permit = crate::blob::get_save_blob_permit().await?;
let entry_blob_hash = crate::blob::save_blob_from_reader(
brioche,
&mut permit,
archive_entry,
crate::blob::SaveBlobOptions::new(),
&mut buffer,
)
.await?;
let executable = entry_mode & 0o100 != 0;

Some(Artifact::File(File {
content_blob: entry_blob_hash,
executable,
resources: Directory::default(),
}))
}
tokio_tar::EntryType::Symlink => {
let link_name = archive_entry.link_name_bytes().with_context(|| {
format!(
"unsupported tar archive: no link name for symlink entry at {}",
ArchiveEntry::Symlink {
target: link_name.into_owned().into(),
}
}
tar::EntryType::Link => {
let link_name = archive_entry.link_name_bytes().with_context(|| {
format!(
"unsupported tar archive: no link name for hardlink entry at {}",
entry_path
)
})?;

ArchiveEntry::Link {
link_name: link_name.into_owned().into(),
}
}
tar::EntryType::Directory => ArchiveEntry::Directory,
tar::EntryType::XGlobalHeader | tar::EntryType::XHeader => {
// Ignore
continue;
}
other => {
anyhow::bail!(
"unsupported tar archive: unsupported entry type {:?} at {}",
other,
entry_path
)
})?;
);
}
};

Some(Artifact::Symlink {
target: link_name.into_owned().into(),
})
}
tokio_tar::EntryType::Link => {
let link_name = archive_entry.link_name_bytes().with_context(|| {
entry_tx.blocking_send((entry_path, entry))?;
}

anyhow::Ok(())
}
});
let process_archive_task = async {
process_archive_task.await??;
anyhow::Result::<()>::Ok(())
};

let build_directory_fut = async {
let mut directory_entries = BTreeMap::<BString, WithMeta<Artifact>>::new();

while let Some((entry_path, entry)) = entry_rx.recv().await {
let entry_artifact = match entry {
ArchiveEntry::File {
content_blob,
executable,
} => Some(Artifact::File(File {
content_blob,
executable,
resources: Directory::default(),
})),
ArchiveEntry::Symlink { target } => Some(Artifact::Symlink { target }),
ArchiveEntry::Link { link_name } => {
let linked_entry = directory_entries.get(&link_name).with_context(|| {
format!(
"unsupported tar archive: no link name for hardlink entry at {}",
entry_path
)
})?;
let linked_entry =
directory_entries.get(link_name.as_ref()).with_context(|| {
format!(
"unsupported tar archive: could not find target for link entry at {}",
entry_path
)
})?;

})?;
Some(linked_entry.value.clone())
}
tokio_tar::EntryType::Directory => Some(Artifact::Directory(Directory::default())),
tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => {
// Ignore
None
}
other => {
anyhow::bail!(
"unsupported tar archive: unsupported entry type {:?} at {}",
other,
entry_path
);
}
ArchiveEntry::Directory => Some(Artifact::Directory(Directory::default())),
};

let entry_path = crate::fs_utils::logical_path_bytes(&entry_path);
Expand All @@ -144,13 +180,26 @@ pub async fn bake_unarchive(
},
);

anyhow::Ok(())
}
.instrument(tracing::info_span!("save_blobs"));
let directory = Directory::create(brioche, &directory_entries).await?;

save_blobs_future.await?;
anyhow::Ok(directory)
};

let directory = Directory::create(brioche, &directory_entries).await?;
let (_, directory) = tokio::try_join!(process_archive_task, build_directory_fut)?;

Ok(directory)
}

enum ArchiveEntry {
File {
content_blob: BlobHash,
executable: bool,
},
Symlink {
target: BString,
},
Link {
link_name: BString,
},
Directory,
}
Loading