diff --git a/src/bufread.rs b/src/bufread.rs index 770cd6b4..8d4ea727 100644 --- a/src/bufread.rs +++ b/src/bufread.rs @@ -28,6 +28,7 @@ pub struct BzDecoder { obj: R, data: Decompress, done: bool, + multi: bool, } impl BzEncoder { @@ -147,8 +148,14 @@ impl BzDecoder { obj: r, data: Decompress::new(false), done: false, + multi: false, } } + + fn multi(mut self, flag: bool) -> BzDecoder { + self.multi = flag; + self + } } impl BzDecoder { @@ -187,7 +194,7 @@ impl BzDecoder { impl Read for BzDecoder { fn read(&mut self, buf: &mut [u8]) -> io::Result { if self.done { - return Ok(0) + return Ok(0); } loop { let (read, consumed, eof, ret); @@ -206,7 +213,12 @@ impl Read for BzDecoder { io::Error::new(io::ErrorKind::InvalidInput, e) })); if ret == Status::StreamEnd { - self.done = true; + if !eof && self.multi { + self.data = Decompress::new(false); + } else { + self.done = true; + } + return Ok(read) } if read > 0 || eof || buf.len() == 0 { @@ -236,3 +248,62 @@ impl AsyncWrite for BzDecoder { self.get_mut().shutdown() } } + +/// A bzip2 streaming decoder that decodes all members of a multistream +/// +/// Wikipedia, particularly, uses bzip2 multistream for their dumps. +pub struct MultiBzDecoder(BzDecoder); + +impl MultiBzDecoder { + /// Creates a new decoder from the given reader. If the bzip2 stream contains multiple members + /// all will be decoded. + pub fn new(r: R) -> MultiBzDecoder { + MultiBzDecoder(BzDecoder::new(r).multi(true)) + } +} + +impl MultiBzDecoder { + /// Acquires a reference to the underlying reader. + pub fn get_ref(&self) -> &R { + self.0.get_ref() + } + + /// Acquires a mutable reference to the underlying stream. + /// + /// Note that mutation of the stream may result in surprising results if + /// this encoder is continued to be used. + pub fn get_mut(&mut self) -> &mut R { + self.0.get_mut() + } + + /// Consumes this decoder, returning the underlying reader. + pub fn into_inner(self) -> R { + self.0.into_inner() + } +} + +impl Read for MultiBzDecoder { + fn read(&mut self, into: &mut [u8]) -> io::Result { + self.0.read(into) + } +} + +#[cfg(feature = "tokio")] +impl AsyncRead for MultiBzDecoder {} + +impl Write for MultiBzDecoder { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.get_mut().write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.get_mut().flush() + } +} + +#[cfg(feature = "tokio")] +impl AsyncWrite for MultiBzDecoder { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.get_mut().shutdown() + } +} diff --git a/src/read.rs b/src/read.rs index d9c298ff..36b2d31e 100644 --- a/src/read.rs +++ b/src/read.rs @@ -77,8 +77,7 @@ impl Read for BzEncoder { } #[cfg(feature = "tokio")] -impl AsyncRead for BzEncoder { -} +impl AsyncRead for BzEncoder {} impl Write for BzEncoder { fn write(&mut self, buf: &[u8]) -> io::Result { @@ -149,8 +148,7 @@ impl Read for BzDecoder { } #[cfg(feature = "tokio")] -impl AsyncRead for BzDecoder { -} +impl AsyncRead for BzDecoder {} impl Write for BzDecoder { fn write(&mut self, buf: &[u8]) -> io::Result { @@ -169,10 +167,74 @@ impl AsyncWrite for BzDecoder { } } +/// A bzip2 streaming decoder that decodes all members of a multistream +/// +/// Wikipedia, particularly, uses bzip2 multistream for their dumps. +pub struct MultiBzDecoder { + inner: bufread::MultiBzDecoder>, +} + +impl MultiBzDecoder { + /// Creates a new decoder from the given reader, immediately parsing the + /// (first) gzip header. If the gzip stream contains multiple members all will + /// be decoded. + pub fn new(r: R) -> MultiBzDecoder { + MultiBzDecoder { + inner: bufread::MultiBzDecoder::new(BufReader::new(r)), + } + } +} + +impl MultiBzDecoder { + /// Acquires a reference to the underlying reader. + pub fn get_ref(&self) -> &R { + self.inner.get_ref().get_ref() + } + + /// Acquires a mutable reference to the underlying stream. + /// + /// Note that mutation of the stream may result in surprising results if + /// this encoder is continued to be used. + pub fn get_mut(&mut self) -> &mut R { + self.inner.get_mut().get_mut() + } + + /// Consumes this decoder, returning the underlying reader. + pub fn into_inner(self) -> R { + self.inner.into_inner().into_inner() + } +} + +impl Read for MultiBzDecoder { + fn read(&mut self, into: &mut [u8]) -> io::Result { + self.inner.read(into) + } +} + +#[cfg(feature = "tokio")] +impl AsyncRead for MultiBzDecoder {} + +impl Write for MultiBzDecoder { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.get_mut().write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.get_mut().flush() + } +} + +#[cfg(feature = "tokio")] +impl AsyncWrite for MultiBzDecoder { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.get_mut().shutdown() + } +} + #[cfg(test)] mod tests { use std::io::prelude::*; - use read::{BzEncoder, BzDecoder}; + use read::{BzEncoder, BzDecoder, MultiBzDecoder}; use Compression; use rand::{thread_rng, Rng}; use rand::distributions::Standard; @@ -256,6 +318,25 @@ mod tests { assert!(d.read(&mut data).unwrap() == 0); } + #[test] + fn multistream_read_till_eof() { + let m = vec![3u8; 128 * 1024 + 1]; + let repeat = 3; + let mut result = Vec::new(); + + for _i in 0..repeat { + let mut c = BzEncoder::new(&m[..], Compression::default()); + c.read_to_end(&mut result).unwrap(); + } + + let mut d = MultiBzDecoder::new(&result[..]); + let mut data = Vec::new(); + + let a = d.read_to_end(&mut data).unwrap(); + let b = m.len() * repeat; + assert!(a == b, "{} {}", a, b); + } + #[test] fn empty() { let r = BzEncoder::new(&[][..], Compression::default());