Skip to content

Commit

Permalink
Streaming compression
Browse files Browse the repository at this point in the history
  • Loading branch information
kornelski committed Oct 23, 2022
1 parent f421f08 commit 237a6de
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 126 deletions.
73 changes: 27 additions & 46 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ path = "src/main.rs"
doc = false

[dependencies]
flate2 = "1.0.24"
getopts = "0.2.21"
md5 = "0.7.0"
quick-error = "2.0.1"
Expand All @@ -47,7 +48,6 @@ serde = { version = "1.0.145", features = ["derive"] }
serde_json = "1.0.85"
tar = "0.4.38"
toml = "0.5.9"
zopfli = "0.7.1"
glob = "0.3.0"
ar = "0.9.0"
cargo_toml = "0.13.0"
Expand Down
141 changes: 99 additions & 42 deletions src/compress.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,74 @@
use crate::error::*;
use std::io::{Read, Write};
use std::io::{Read, BufWriter};
use std::io;
use std::ops;
use std::process::ChildStdin;
use std::process::{Command, Stdio};

enum Writer {
#[cfg(feature = "lzma")]
Xz(xz2::write::XzEncoder<Vec<u8>>),
StdIn(BufWriter<ChildStdin>),
#[cfg(not(feature = "lzma"))]
Gz(flate2::write::GzEncoder<Vec<u8>>),
}

pub struct Compressor {
writer: Writer,
ret: Box<dyn FnOnce(Writer) -> io::Result<Compressed> + Send + Sync>,
pub uncompressed_size: usize,
}

impl io::Write for Compressor {
fn flush(&mut self) -> io::Result<()> {
match &mut self.writer {
#[cfg(feature = "lzma")]
Writer::Xz(w) => w.flush(),
#[cfg(not(feature = "lzma"))]
Writer::Gz(w) => w.flush(),
Writer::StdIn(w) => w.flush(),
}
}

fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let len = match &mut self.writer {
#[cfg(feature = "lzma")]
Writer::Xz(w) => w.write(buf),
#[cfg(not(feature = "lzma"))]
Writer::Gz(w) => w.write(buf),
Writer::StdIn(w) => w.write(buf),
}?;
self.uncompressed_size += len;
Ok(len)
}

fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
match &mut self.writer {
#[cfg(feature = "lzma")]
Writer::Xz(w) => w.write_all(buf),
#[cfg(not(feature = "lzma"))]
Writer::Gz(w) => w.write_all(buf),
Writer::StdIn(w) => w.write_all(buf),
}?;
self.uncompressed_size += buf.len();
Ok(())
}
}

impl Compressor {
fn new(writer: Writer, ret: impl FnOnce(Writer) -> io::Result<Compressed> + Send + Sync + 'static) -> Self {
Self {
writer,
ret: Box::new(ret),
uncompressed_size: 0,
}
}

pub fn finish(self) -> CDResult<Compressed> {
(self.ret)(self.writer).map_err(From::from)
}
}

pub enum Compressed {
Gz(Vec<u8>),
Xz(Vec<u8>),
Expand All @@ -27,7 +93,7 @@ impl Compressed {
}
}

fn system_xz(data: &[u8], fast: bool) -> CDResult<Compressed> {
fn system_xz(fast: bool) -> CDResult<Compressor> {
let mut child = Command::new("xz")
.arg(if fast { "-1" } else { "-6" })
.stdin(Stdio::piped())
Expand All @@ -37,69 +103,60 @@ fn system_xz(data: &[u8], fast: bool) -> CDResult<Compressed> {
.map_err(|e| CargoDebError::CommandFailed(e, "xz"))?;
let mut stdout = child.stdout.take().unwrap();

let capacity = data.len() / 2;
let t = std::thread::spawn(move || {
let mut buf = Vec::with_capacity(capacity);
let mut buf = Vec::new();
stdout.read_to_end(&mut buf).map(|_| buf)
});
child.stdin.take().unwrap().write_all(data)?; // This has to close stdin
Ok(Compressed::Xz(t.join().unwrap()?))

let stdin = BufWriter::with_capacity(1<<16, child.stdin.take().unwrap());
Ok(Compressor::new(Writer::StdIn(stdin), move |stdin| {
drop(stdin);
child.wait()?;
t.join().unwrap().map(Compressed::Xz)
}))
}

/// Compresses data using the [native Rust implementation of Zopfli](https://github.com/carols10cents/zopfli).
#[cfg(not(feature = "lzma"))]
pub fn xz_or_gz(data: &[u8], fast: bool, with_system_xz: bool) -> CDResult<Compressed> {
pub fn xz_or_gz(fast: bool, with_system_xz: bool) -> CDResult<Compressor> {
// Very old dpkg doesn't support LZMA, so use it only if expliclty enabled
if with_system_xz {
return system_xz(data, fast);
return system_xz(fast);
}

use std::num::NonZeroU8;
use zopfli::{Format, Options};

// Compressed data is typically half to a third the original size
let mut compressed = Vec::with_capacity(data.len() >> 1);
zopfli::compress(
&if fast {
Options {
iteration_count: NonZeroU8::new(5).unwrap(),
maximum_block_splits: 5,
}
} else {
Options::default()
},
&Format::Gzip,
data,
&mut compressed,
)?;

Ok(Compressed::Gz(compressed))
use flate2::Compression;
use flate2::write::GzEncoder;

let writer = GzEncoder::new(Vec::new(), if fast { Compression::fast() } else { Compression::best() });

Ok(Compressor::new(Writer::Gz(writer), move |writer| {
match writer {
Writer::Gz(w) => Ok(Compressed::Gz(w.finish()?)),
_ => unreachable!(),
}
}))
}

/// Compresses data using the xz2 library
#[cfg(feature = "lzma")]
pub fn xz_or_gz(data: &[u8], fast: bool, with_system_xz: bool) -> CDResult<Compressed> {
pub fn xz_or_gz(fast: bool, with_system_xz: bool) -> CDResult<Compressor> {
if with_system_xz {
return system_xz(data, fast);
return system_xz(fast);
}

use xz2::stream;
use xz2::write::XzEncoder;

// Compressed data is typically half to a third the original size
let buf = Vec::with_capacity(data.len() >> 1);

// Compression level 6 is a good trade off between size and [ridiculously] long compression time
let encoder = stream::MtStreamBuilder::new()
let encoder = xz2::stream::MtStreamBuilder::new()
.threads(num_cpus::get() as u32)
.preset(if fast { 1 } else { 6 })
.encoder()
.map_err(CargoDebError::LzmaCompressionError)?;

let mut writer = XzEncoder::new_stream(buf, encoder);
writer.write_all(data).map_err(CargoDebError::Io)?;

let compressed = writer.finish().map_err(CargoDebError::Io)?;
let writer = xz2::write::XzEncoder::new_stream(Vec::new(), encoder);

Ok(Compressed::Xz(compressed))
Ok(Compressor::new(Writer::Xz(writer), |writer| {
match writer {
Writer::Xz(w) => w.finish().map(Compressed::Xz),
_ => unreachable!(),
}
}))
}
Loading

0 comments on commit 237a6de

Please sign in to comment.