Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipelined extraction #236

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

cosmicexplorer
Copy link
Contributor

@cosmicexplorer cosmicexplorer commented Aug 17, 2024

Recreation of #208 to work around github issues.

Problem

ZipArchive::extract() corresponds to the way most zip implementations perform the task, but it's single-threaded. This is appropriate under the assumptions imposed by rust's Read and Seek traits, where mutable access is necessary and only one reader can extract file contents at a time, but most unix-like operating systems offer a pread() operation which avoids mutating OS state like the file offset, so multiple threads can read from a file handle at once. The go programming language offers io.ReaderAt in the stdlib to codify this ability.

Solution

This is a rework of #72 which avoids introducing unnecessary thread pools and creates all output file handles and containing directories up front. For large zips, we want to:

  • create output handles and containing directories up front,
  • split the input file handle into chunks to process the constituent file entries in parallel,
  • for large compressed entries, pipe their content into a dedicated stream to avoid intermixing i/o and decompression and blocking quick small entries later in the file.

src/read/split.rs was created to cover pread() and other operations, while src/read/pipelining.rs was created to perform the high-level logic to split up entries and perform pipelined extraction.

Result

  • The parallelism feature was added to the crate to gate the newly added code + API.
  • A dependency on the libc crate was added for #[cfg(all(unix, feature = "parallelism"))] in order to make use of OS-specific functionality.
  • zip::read::split_extract() was added as a new external API to extract &ZipArchive<fs::File> when #[cfg(all(unix, feature = "parallelism"))].

Note that this does not handle symlinks yet, which I plan to add in a followup PR.

CURRENT BENCHMARK STATUS

On a linux host (with splice() and optionally copy_file_range()), we get about a 6.5x speedup with 12 decompression threads:

> cargo bench --features parallelism -- extract
running 2 tests
test extract_basic           ... bench: 104,389,978 ns/iter (+/- 5,715,453) = 85 MB/s
test extract_split           ... bench:  16,274,974 ns/iter (+/- 1,530,257) = 546 MB/s

The performance should keep increasing as we increase thread count, up to the number of available CPU cores (this was running with a parallelism of 12 on my 16-core laptop). This also works on macOS and BSDs, and other #[cfg(unix)] platforms.

src/read/pipelining.rs Fixed Show fixed Hide fixed
src/read/pipelining.rs Dismissed Show dismissed Hide dismissed
src/read/pipelining.rs Dismissed Show dismissed Hide dismissed
src/read/pipelining.rs Dismissed Show dismissed Hide dismissed
src/read/pipelining.rs Dismissed Show dismissed Hide dismissed
src/read/split.rs Dismissed Show dismissed Hide dismissed
src/read/split.rs Dismissed Show dismissed Hide dismissed
src/read/split.rs Dismissed Show dismissed Hide dismissed
src/types.rs Dismissed Show dismissed Hide dismissed
src/types.rs Dismissed Show dismissed Hide dismissed
src/read/pipelining.rs Fixed Show fixed Hide fixed
src/read/pipelining.rs Dismissed Show dismissed Hide dismissed
@cosmicexplorer cosmicexplorer force-pushed the pipelined-extract-v2 branch 4 times, most recently from 7a45b32 to 5cec332 Compare August 21, 2024 04:21
@cosmicexplorer
Copy link
Contributor Author

Going to try to get this one in before figuring out the cli PR.

- initial sketch of lexicographic trie for pipelining
- move path splitting into a submodule
- lex trie can now propagate entry data
- outline handle allocation
- mostly handle files
- mostly handle dirs
- clarify symlink FIXMEs
- do symlink validation
- extract writable dir setting to helper method
- modify args to handle allocation method
- handle allocation test passes
- simplify perms a lot
- outline evaluation
- handle symlinks
- BIGGER CHANGE! add EntryReader/etc
- make initial pipelined extract work
- fix file perms by writing them after finishing the file write
- support directory entries by unix mode as well
- impl split extraction
- remove dependency on reader refactoring
- add dead_code to methods we don't use yet
@cosmicexplorer cosmicexplorer force-pushed the pipelined-extract-v2 branch 4 times, most recently from b90c9e2 to fa18aa3 Compare January 16, 2025 21:23
@cosmicexplorer cosmicexplorer marked this pull request as ready for review January 17, 2025 00:48
Copy link
Member

@Pr0methean Pr0methean left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's a review of what I've read so far. Still needs a fair bit of work, but I'm happy with the overall concept.

}

#[cfg(all(feature = "parallelism", feature = "bzip2", unix))]
const DECOMPRESSION_THREADS: usize = 8;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use https://docs.rs/num_cpus/latest/num_cpus/ instead of a hard-coded number. Also make sure the archive used for this benchmark contains at least 768 files (4 per VCPU on the EC2 *.48xlarge instances) so that scalable parallelism is properly measured.

@@ -0,0 +1,1209 @@
//! Pipelined extraction into a filesystem directory.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can some of these allow's be removed?

/* The ZIP spec states (APPNOTE 4.4.17) that file paths are in Unix format, and Unix
* filesystems treat a backslash as a normal character. Thus they should be allowed on Unix
* and replaced with \u{fffd} on Windows. */
if entry_path.starts_with('/') {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolute paths should be allowed if they're inside the destination directory (e.g. when an OS installer is unzipping the contents of the root filesystem).

#[derive(PartialEq, Eq, Debug, Clone)]
pub(crate) struct DirEntry<'a, Data> {
pub properties: Option<Data>,
pub children: BTreeMap<&'a str, Box<FSEntry<'a, Data>>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we were to Box the BTreeMap instead of its individual entries, what difference would that make to pointer chasing and heap fragmentation?

let next_subdir = cur_dir
.children
.entry(component)
.or_insert_with(|| Box::new(FSEntry::Dir(DirEntry::default())));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement From<DirEntry> for FSEntry and make this just DirEntry::default().into().

/// i/o error: {0}
Io(#[from] io::Error),
/// zip error: {0}
Zip(#[from] ZipError),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Combine Io and Zip since ZipError already has a variant for io::Error. I recommend this even if you end up having to implement From<io::Error> by hand (which will happen if #[from] isn't transitive).

let block: MaybeUninit<[u8; mem::size_of::<ZipLocalEntryBlock>()]> =
MaybeUninit::uninit();
let mut block: [MaybeUninit<u8>; mem::size_of::<ZipLocalEntryBlock>()] =
unsafe { mem::transmute(block) };
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use pod_read_unaligned to encapsulate the unsafe code. Also, call from_le() to convert the endianness if necessary.

Ok(()) => (),
// If the value was already set in the meantime, ensure it matches.
Err(_) => {
assert_eq!(*data.data_start.get().unwrap(), data_start);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change this to debug_assert_eq! for performance reasons.

pub file_range_copy_buffer_length: usize,
/// Size of buffer used to splice contents from a pipe into an output file handle.
///
/// Used on non-Linux platforms without [`splice()`](https://en.wikipedia.org/wiki/Splice_(system_call)).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This buffer isn't necessary on any Unix; see https://stackoverflow.com/a/10330172.

Comment on lines +766 to +775
move || match f() {
Ok(()) => (),
#[allow(clippy::single_match)]
Err(e) => match err_sender.send(e) {
Ok(()) => (),
/* We use an async sender, so this should only error if the receiver has hung
* up, which occurs when we return a previous error from the main thread. */
Err(mpsc::SendError(_)) => (),
},
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
move || match f() {
Ok(()) => (),
#[allow(clippy::single_match)]
Err(e) => match err_sender.send(e) {
Ok(()) => (),
/* We use an async sender, so this should only error if the receiver has hung
* up, which occurs when we return a previous error from the main thread. */
Err(mpsc::SendError(_)) => (),
},
}
move || if let Err(e) = f() {
/* We use an async sender, so this should only error if the receiver has hung up because of a previous error. */
let _ = err_sender.send(e);
}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants