Skip to content

Commit

Permalink
merge MultiGzDecoder
Browse files Browse the repository at this point in the history
  • Loading branch information
quininer committed Feb 15, 2019
1 parent 6512539 commit 9dc0d9e
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 82 deletions.
130 changes: 48 additions & 82 deletions src/gz/bufread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,8 @@ impl<R: BufRead + Write> Write for GzEncoder<R> {
pub struct GzDecoder<R> {
inner: GzState,
header: GzHeader,
reader: CrcReader<deflate::bufread::DeflateDecoder<R>>
reader: CrcReader<deflate::bufread::DeflateDecoder<R>>,
multi: bool
}

#[derive(Debug)]
Expand All @@ -467,6 +468,7 @@ impl<R: BufRead> GzDecoder<R> {
let result = read_gz_header2(&mut r, &mut state, &mut header, &mut flag, &mut hasher);

GzDecoder {
multi: false,
inner: if let Err(err) = result {
GzState::Err(err)
} else {
Expand All @@ -486,9 +488,15 @@ impl<R: BufRead> GzDecoder<R> {
hasher: Hasher::new()
},
header: GzHeader::default(),
reader: CrcReader::new(deflate::bufread::DeflateDecoder::new(r))
reader: CrcReader::new(deflate::bufread::DeflateDecoder::new(r)),
multi: false
}
}

pub fn multi(mut self, flag: bool) -> GzDecoder<R> {
self.multi = flag;
self
}
}

impl<R> GzDecoder<R> {
Expand Down Expand Up @@ -521,10 +529,11 @@ impl<R> GzDecoder<R> {

impl<R: BufRead> Read for GzDecoder<R> {
fn read(&mut self, into: &mut [u8]) -> io::Result<usize> {
let GzDecoder { inner, header, reader } = self;
let GzDecoder { inner, header, reader, multi } = self;

enum Next {
None,
Header,
Body,
Finished,
Err(io::Error),
Expand Down Expand Up @@ -568,8 +577,21 @@ impl<R: BufRead> Read for GzDecoder<R> {
next = Next::Err(corrupt());
} else if amt != reader.crc().amount() {
next = Next::Err(corrupt());
} else {
} else if !*multi {
next = Next::End;
} else {
match reader.get_mut().get_mut().fill_buf() {
Ok(buf) => if buf.is_empty() {
next = Next::End;
} else {
next = Next::Header;
},
Err(err) => if io::ErrorKind::WouldBlock == err.kind() {
return Err(err);
} else {
next = Next::Err(err);
}
}
}
},
GzState::Err(err) => next = Next::Err(mem::replace(err, io::ErrorKind::Other.into())),
Expand All @@ -578,6 +600,16 @@ impl<R: BufRead> Read for GzDecoder<R> {

match mem::replace(&mut next, Next::None) {
Next::None => (),
Next::Header => {
reader.reset();
reader.get_mut().reset_data();
*header = GzHeader::default();
*inner = GzState::Header {
state: GzHeaderState::Header(0, [0; 10]),
flag: 0,
hasher: Hasher::new()
};
},
Next::Body => *inner = GzState::Body,
Next::Finished => *inner = GzState::Finished(0, [0; 8]),
Next::Err(err) => {
Expand Down Expand Up @@ -641,117 +673,51 @@ impl<R: BufRead + Write> Write for GzDecoder<R> {
/// }
/// ```
#[derive(Debug)]
pub struct MultiGzDecoder<R> {
inner: CrcReader<deflate::bufread::DeflateDecoder<R>>,
header: io::Result<GzHeader>,
finished: bool,
}
pub struct MultiGzDecoder<R>(GzDecoder<R>);

impl<R: BufRead> MultiGzDecoder<R> {
/// Creates a new decoder from the given reader, immediately parsing the
/// (first) gzip header. If the gzip stream contains multiple members all will
/// be decoded.
pub fn new(mut r: R) -> MultiGzDecoder<R> {
let header = read_gz_header(&mut r);

let flate = deflate::bufread::DeflateDecoder::new(r);
MultiGzDecoder {
inner: CrcReader::new(flate),
header: header,
finished: false,
}
pub fn new(r: R) -> MultiGzDecoder<R> {
MultiGzDecoder(GzDecoder::new(r).multi(true))
}

fn finish_member(&mut self) -> io::Result<usize> {
if self.finished {
return Ok(0);
}
let ref mut buf = [0u8; 8];
{
let mut len = 0;

while len < buf.len() {
match self.inner.get_mut().get_mut().read(&mut buf[len..])? {
0 => return Err(corrupt()),
n => len += n,
}
}
}

let crc = ((buf[0] as u32) << 0)
| ((buf[1] as u32) << 8)
| ((buf[2] as u32) << 16)
| ((buf[3] as u32) << 24);
let amt = ((buf[4] as u32) << 0)
| ((buf[5] as u32) << 8)
| ((buf[6] as u32) << 16)
| ((buf[7] as u32) << 24);
if crc != self.inner.crc().sum() as u32 {
return Err(corrupt());
}
if amt != self.inner.crc().amount() {
return Err(corrupt());
}
let remaining = match self.inner.get_mut().get_mut().fill_buf() {
Ok(b) => {
if b.is_empty() {
self.finished = true;
return Ok(0);
} else {
b.len()
}
}
Err(e) => return Err(e),
};

let next_header = read_gz_header(self.inner.get_mut().get_mut());
drop(mem::replace(&mut self.header, next_header));
self.inner.reset();
self.inner.get_mut().reset_data();

Ok(remaining)
/// Creates a new decoder from the given reader.
/// If the gzip stream contains multiple members all will be decoded.
pub fn new2(r: R) -> MultiGzDecoder<R> {
MultiGzDecoder(GzDecoder::new2(r).multi(true))
}
}

impl<R> MultiGzDecoder<R> {
/// Returns the current header associated with this stream, if it's valid
pub fn header(&self) -> Option<&GzHeader> {
self.header.as_ref().ok()
self.0.header()
}

/// Acquires a reference to the underlying reader.
pub fn get_ref(&self) -> &R {
self.inner.get_ref().get_ref()
self.0.get_ref()
}

/// Acquires a mutable reference to the underlying stream.
///
/// Note that mutation of the stream may result in surprising results if
/// this encoder is continued to be used.
pub fn get_mut(&mut self) -> &mut R {
self.inner.get_mut().get_mut()
self.0.get_mut()
}

/// Consumes this decoder, returning the underlying reader.
pub fn into_inner(self) -> R {
self.inner.into_inner().into_inner()
self.0.into_inner()
}
}

impl<R: BufRead> Read for MultiGzDecoder<R> {
fn read(&mut self, into: &mut [u8]) -> io::Result<usize> {
if let Err(ref mut e) = self.header {
let another_error = io::ErrorKind::Other.into();
return Err(mem::replace(e, another_error));
}
match self.inner.read(into)? {
0 => match self.finish_member() {
Ok(0) => Ok(0),
Ok(_) => self.read(into),
Err(e) => Err(e),
},
n => Ok(n),
}
self.0.read(into)
}
}

Expand Down
14 changes: 14 additions & 0 deletions src/gz/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ impl<R: Read> GzDecoder<R> {
inner: bufread::GzDecoder::new2(BufReader::new(r)),
}
}

pub fn multi(self, flag: bool) -> GzDecoder<R> {
GzDecoder {
inner: self.inner.multi(flag)
}
}
}

impl<R> GzDecoder<R> {
Expand Down Expand Up @@ -241,6 +247,14 @@ impl<R: Read> MultiGzDecoder<R> {
inner: bufread::MultiGzDecoder::new(BufReader::new(r)),
}
}

/// Creates a new decoder from the given reader.
/// If the gzip stream contains multiple members all will be decoded.
pub fn new2(r: R) -> MultiGzDecoder<R> {
MultiGzDecoder {
inner: bufread::MultiGzDecoder::new2(BufReader::new(r)),
}
}
}

impl<R> MultiGzDecoder<R> {
Expand Down
16 changes: 16 additions & 0 deletions tests/async-reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,19 @@ fn test_gz_asyncread() {

assert_eq!(content, expected);
}

#[test]
fn test_multi_gz_asyncread() {
let f = File::open("tests/multi.gz").unwrap();

let fut = read_to_end(AssertAsync(GzDecoder::new2(BadReader::new(f)).multi(true)), Vec::new());
let (_, content) = AlwaysNotify(fut).wait().unwrap();

let mut expected = Vec::new();
File::open("tests/multi.txt")
.unwrap()
.read_to_end(&mut expected)
.unwrap();

assert_eq!(content, expected);
}

0 comments on commit 9dc0d9e

Please sign in to comment.