From 9dc0d9eebfe6f6528de8cc940c1fa19d700da2dc Mon Sep 17 00:00:00 2001 From: quininer Date: Fri, 15 Feb 2019 15:24:08 +0800 Subject: [PATCH] merge MultiGzDecoder --- src/gz/bufread.rs | 130 ++++++++++++++++-------------------------- src/gz/read.rs | 14 +++++ tests/async-reader.rs | 16 ++++++ 3 files changed, 78 insertions(+), 82 deletions(-) diff --git a/src/gz/bufread.rs b/src/gz/bufread.rs index edc7f2e25..4e728e837 100644 --- a/src/gz/bufread.rs +++ b/src/gz/bufread.rs @@ -440,7 +440,8 @@ impl Write for GzEncoder { pub struct GzDecoder { inner: GzState, header: GzHeader, - reader: CrcReader> + reader: CrcReader>, + multi: bool } #[derive(Debug)] @@ -467,6 +468,7 @@ impl GzDecoder { let result = read_gz_header2(&mut r, &mut state, &mut header, &mut flag, &mut hasher); GzDecoder { + multi: false, inner: if let Err(err) = result { GzState::Err(err) } else { @@ -486,9 +488,15 @@ impl GzDecoder { hasher: Hasher::new() }, header: GzHeader::default(), - reader: CrcReader::new(deflate::bufread::DeflateDecoder::new(r)) + reader: CrcReader::new(deflate::bufread::DeflateDecoder::new(r)), + multi: false } } + + pub fn multi(mut self, flag: bool) -> GzDecoder { + self.multi = flag; + self + } } impl GzDecoder { @@ -521,10 +529,11 @@ impl GzDecoder { impl Read for GzDecoder { fn read(&mut self, into: &mut [u8]) -> io::Result { - let GzDecoder { inner, header, reader } = self; + let GzDecoder { inner, header, reader, multi } = self; enum Next { None, + Header, Body, Finished, Err(io::Error), @@ -568,8 +577,21 @@ impl Read for GzDecoder { next = Next::Err(corrupt()); } else if amt != reader.crc().amount() { next = Next::Err(corrupt()); - } else { + } else if !*multi { next = Next::End; + } else { + match reader.get_mut().get_mut().fill_buf() { + Ok(buf) => if buf.is_empty() { + next = Next::End; + } else { + next = Next::Header; + }, + Err(err) => if io::ErrorKind::WouldBlock == err.kind() { + return Err(err); + } else { + next = Next::Err(err); + } + } } }, GzState::Err(err) => next = Next::Err(mem::replace(err, io::ErrorKind::Other.into())), @@ -578,6 +600,16 @@ impl Read for GzDecoder { match mem::replace(&mut next, Next::None) { Next::None => (), + Next::Header => { + reader.reset(); + reader.get_mut().reset_data(); + *header = GzHeader::default(); + *inner = GzState::Header { + state: GzHeaderState::Header(0, [0; 10]), + flag: 0, + hasher: Hasher::new() + }; + }, Next::Body => *inner = GzState::Body, Next::Finished => *inner = GzState::Finished(0, [0; 8]), Next::Err(err) => { @@ -641,87 +673,32 @@ impl Write for GzDecoder { /// } /// ``` #[derive(Debug)] -pub struct MultiGzDecoder { - inner: CrcReader>, - header: io::Result, - finished: bool, -} +pub struct MultiGzDecoder(GzDecoder); impl MultiGzDecoder { /// Creates a new decoder from the given reader, immediately parsing the /// (first) gzip header. If the gzip stream contains multiple members all will /// be decoded. - pub fn new(mut r: R) -> MultiGzDecoder { - let header = read_gz_header(&mut r); - - let flate = deflate::bufread::DeflateDecoder::new(r); - MultiGzDecoder { - inner: CrcReader::new(flate), - header: header, - finished: false, - } + pub fn new(r: R) -> MultiGzDecoder { + MultiGzDecoder(GzDecoder::new(r).multi(true)) } - fn finish_member(&mut self) -> io::Result { - if self.finished { - return Ok(0); - } - let ref mut buf = [0u8; 8]; - { - let mut len = 0; - - while len < buf.len() { - match self.inner.get_mut().get_mut().read(&mut buf[len..])? { - 0 => return Err(corrupt()), - n => len += n, - } - } - } - - let crc = ((buf[0] as u32) << 0) - | ((buf[1] as u32) << 8) - | ((buf[2] as u32) << 16) - | ((buf[3] as u32) << 24); - let amt = ((buf[4] as u32) << 0) - | ((buf[5] as u32) << 8) - | ((buf[6] as u32) << 16) - | ((buf[7] as u32) << 24); - if crc != self.inner.crc().sum() as u32 { - return Err(corrupt()); - } - if amt != self.inner.crc().amount() { - return Err(corrupt()); - } - let remaining = match self.inner.get_mut().get_mut().fill_buf() { - Ok(b) => { - if b.is_empty() { - self.finished = true; - return Ok(0); - } else { - b.len() - } - } - Err(e) => return Err(e), - }; - - let next_header = read_gz_header(self.inner.get_mut().get_mut()); - drop(mem::replace(&mut self.header, next_header)); - self.inner.reset(); - self.inner.get_mut().reset_data(); - - Ok(remaining) + /// Creates a new decoder from the given reader. + /// If the gzip stream contains multiple members all will be decoded. + pub fn new2(r: R) -> MultiGzDecoder { + MultiGzDecoder(GzDecoder::new2(r).multi(true)) } } impl MultiGzDecoder { /// Returns the current header associated with this stream, if it's valid pub fn header(&self) -> Option<&GzHeader> { - self.header.as_ref().ok() + self.0.header() } /// Acquires a reference to the underlying reader. pub fn get_ref(&self) -> &R { - self.inner.get_ref().get_ref() + self.0.get_ref() } /// Acquires a mutable reference to the underlying stream. @@ -729,29 +706,18 @@ impl MultiGzDecoder { /// Note that mutation of the stream may result in surprising results if /// this encoder is continued to be used. pub fn get_mut(&mut self) -> &mut R { - self.inner.get_mut().get_mut() + self.0.get_mut() } /// Consumes this decoder, returning the underlying reader. pub fn into_inner(self) -> R { - self.inner.into_inner().into_inner() + self.0.into_inner() } } impl Read for MultiGzDecoder { fn read(&mut self, into: &mut [u8]) -> io::Result { - if let Err(ref mut e) = self.header { - let another_error = io::ErrorKind::Other.into(); - return Err(mem::replace(e, another_error)); - } - match self.inner.read(into)? { - 0 => match self.finish_member() { - Ok(0) => Ok(0), - Ok(_) => self.read(into), - Err(e) => Err(e), - }, - n => Ok(n), - } + self.0.read(into) } } diff --git a/src/gz/read.rs b/src/gz/read.rs index 288c63bfd..32a87e297 100644 --- a/src/gz/read.rs +++ b/src/gz/read.rs @@ -144,6 +144,12 @@ impl GzDecoder { inner: bufread::GzDecoder::new2(BufReader::new(r)), } } + + pub fn multi(self, flag: bool) -> GzDecoder { + GzDecoder { + inner: self.inner.multi(flag) + } + } } impl GzDecoder { @@ -241,6 +247,14 @@ impl MultiGzDecoder { inner: bufread::MultiGzDecoder::new(BufReader::new(r)), } } + + /// Creates a new decoder from the given reader. + /// If the gzip stream contains multiple members all will be decoded. + pub fn new2(r: R) -> MultiGzDecoder { + MultiGzDecoder { + inner: bufread::MultiGzDecoder::new2(BufReader::new(r)), + } + } } impl MultiGzDecoder { diff --git a/tests/async-reader.rs b/tests/async-reader.rs index efef619c7..f9f2e27b4 100644 --- a/tests/async-reader.rs +++ b/tests/async-reader.rs @@ -76,3 +76,19 @@ fn test_gz_asyncread() { assert_eq!(content, expected); } + +#[test] +fn test_multi_gz_asyncread() { + let f = File::open("tests/multi.gz").unwrap(); + + let fut = read_to_end(AssertAsync(GzDecoder::new2(BadReader::new(f)).multi(true)), Vec::new()); + let (_, content) = AlwaysNotify(fut).wait().unwrap(); + + let mut expected = Vec::new(); + File::open("tests/multi.txt") + .unwrap() + .read_to_end(&mut expected) + .unwrap(); + + assert_eq!(content, expected); +}