From 098150678695259fd0c853300d341f2199561fc3 Mon Sep 17 00:00:00 2001 From: quininer Date: Thu, 14 Feb 2019 15:38:12 +0800 Subject: [PATCH 01/12] impl async gzheader parse --- src/gz/bufread.rs | 331 +++++++++++++++++++++++++++++++++++++--------- src/gz/mod.rs | 8 +- src/lib.rs | 4 +- tests/gunzip.rs | 2 +- 4 files changed, 275 insertions(+), 70 deletions(-) diff --git a/src/gz/bufread.rs b/src/gz/bufread.rs index ebb795c60..aceeb089e 100644 --- a/src/gz/bufread.rs +++ b/src/gz/bufread.rs @@ -112,6 +112,154 @@ pub(crate) fn read_gz_header(r: &mut R) -> io::Result { }) } +#[derive(Debug)] +pub(crate) enum State { + Header(usize, [u8; 10]), // pos, buf + ExtraLen(usize, [u8; 2]), // pos, buf + Extra(usize), // pos + FileName, + Comment, + Crc(u16, usize, [u8; 2]) // crc, pos, buf +} + +pub(crate) fn read_gz_header2(r: &mut CrcReader, state: &mut State, header: &mut GzHeader, flag: &mut u8) -> io::Result<()> { + enum Next { + None, + ExtraLen, + Extra, + FileName, + Comment, + Crc + } + + let mut next = Next::None; + + loop { + match state { + State::Header(pos, buf) => if *pos < buf.len() { + let len = r.read(&mut buf[*pos..]) + .and_then(|len| if len != 0 { + Ok(len) + } else { + Err(io::ErrorKind::UnexpectedEof.into()) + })?; + *pos += len; + } else { + let id1 = buf[0]; + let id2 = buf[1]; + if id1 != 0x1f || id2 != 0x8b { + return Err(bad_header()); + } + let cm = buf[2]; + if cm != 8 { + return Err(bad_header()); + } + + let flg = buf[3]; + let mtime = ((buf[4] as u32) << 0) + | ((buf[5] as u32) << 8) + | ((buf[6] as u32) << 16) + | ((buf[7] as u32) << 24); + let _xfl = buf[8]; + let os = buf[9]; + + header.operating_system = os; + header.mtime = mtime; + *flag = flg; + + next = Next::ExtraLen; + }, + State::ExtraLen(..) if *flag & FEXTRA == 0 => next = Next::FileName, + State::ExtraLen(pos, buf) => if *pos < buf.len() { + let len = r.read(&mut buf[*pos..]) + .and_then(|len| if len != 0 { + Ok(len) + } else { + Err(io::ErrorKind::UnexpectedEof.into()) + })?; + *pos += len; + } else { + let xlen = (buf[0] as u16) | ((buf[1] as u16) << 8); + header.extra = Some(vec![0; xlen as usize]); + if xlen != 0 { + next = Next::Extra; + } else { + next = Next::FileName; + } + }, + State::Extra(pos) => if let Some(extra) = &mut header.extra { + if *pos < extra.len() { + let len = r.read(&mut extra[*pos..]) + .and_then(|len| if len != 0 { + Ok(len) + } else { + Err(io::ErrorKind::UnexpectedEof.into()) + })?; + *pos += len; + } else { + next = Next::FileName; + } + } else { + next = Next::FileName; // unreachable + }, + State::FileName if *flag & FNAME == 0 => next = Next::Comment, + State::FileName => { + let filename = header.filename.get_or_insert_with(Vec::new); + // wow this is slow + for byte in r.by_ref().bytes() { + let byte = byte?; + if byte == 0 { + break; + } + filename.push(byte); + } + next = Next::Comment; + }, + State::Comment if *flag & FCOMMENT == 0 => next = Next::Crc, + State::Comment => { + let comment = header.comment.get_or_insert_with(Vec::new); + // wow this is slow + for byte in r.by_ref().bytes() { + let byte = byte?; + if byte == 0 { + break; + } + comment.push(byte); + } + next = Next::Crc + }, + State::Crc(..) if *flag & FHCRC == 0 => return Ok(()), + State::Crc(calced_crc, pos, buf) => if *pos < buf.len() { + let len = r.read(&mut buf[*pos..]) + .and_then(|len| if len != 0 { + Ok(len) + } else { + Err(io::ErrorKind::UnexpectedEof.into()) + })?; + *pos += len; + } else { + let stored_crc = (buf[0] as u16) | ((buf[1] as u16) << 8); + if *calced_crc != stored_crc { + return Err(corrupt()); + } else { + return Ok(()) + } + } + }; + + match next { + Next::ExtraLen => *state = State::ExtraLen(0, [0; 2]), + Next::Extra => *state = State::Extra(0), + Next::FileName => *state = State::FileName, + Next::Comment => *state = State::Comment, + Next::Crc => *state = State::Crc(r.crc().sum() as u16, 0, [0; 2]), + Next::None => () + } + + next = Next::None; + } +} + /// A gzip streaming encoder /// /// This structure exposes a [`BufRead`] interface that will read uncompressed data @@ -211,6 +359,19 @@ impl GzEncoder { } } +#[inline] +fn finish(buf: &[u8; 8]) -> (u32, u32) { + let crc = ((buf[0] as u32) << 0) + | ((buf[1] as u32) << 8) + | ((buf[2] as u32) << 16) + | ((buf[3] as u32) << 24); + let amt = ((buf[4] as u32) << 0) + | ((buf[5] as u32) << 8) + | ((buf[6] as u32) << 16) + | ((buf[7] as u32) << 24); + (crc, amt) +} + impl Read for GzEncoder { fn read(&mut self, mut into: &mut [u8]) -> io::Result { let mut amt = 0; @@ -280,69 +441,57 @@ impl Write for GzEncoder { /// ``` #[derive(Debug)] pub struct GzDecoder { - inner: CrcReader>, - header: Option>, - finished: bool, + inner: Inner, + header: GzHeader +} + +#[derive(Debug)] +enum Inner { + Header { + reader: CrcReader, + state: State, + flag: u8 + }, + Body(CrcReader>), + Finished { + reader: CrcReader>, + pos: usize, + buf: [u8; 8] + }, + None } impl GzDecoder { /// Creates a new decoder from the given reader, immediately parsing the /// gzip header. - pub fn new(mut r: R) -> GzDecoder { - let header = read_gz_header(&mut r); - - let flate = deflate::bufread::DeflateDecoder::new(r); + pub fn new(r: R) -> GzDecoder { GzDecoder { - inner: CrcReader::new(flate), - header: Some(header), - finished: false, - } - } - - fn finish(&mut self) -> io::Result<()> { - if self.finished { - return Ok(()); - } - let ref mut buf = [0u8; 8]; - { - let mut len = 0; - - while len < buf.len() { - match self.inner.get_mut().get_mut().read(&mut buf[len..])? { - 0 => return Err(corrupt()), - n => len += n, - } - } - } - - let crc = ((buf[0] as u32) << 0) - | ((buf[1] as u32) << 8) - | ((buf[2] as u32) << 16) - | ((buf[3] as u32) << 24); - let amt = ((buf[4] as u32) << 0) - | ((buf[5] as u32) << 8) - | ((buf[6] as u32) << 16) - | ((buf[7] as u32) << 24); - if crc != self.inner.crc().sum() { - return Err(corrupt()); - } - if amt != self.inner.crc().amount() { - return Err(corrupt()); + inner: Inner::Header { + reader: CrcReader::new(r), + state: State::Header(0, [0; 10]), + flag: 0 + }, + header: GzHeader::default(), } - self.finished = true; - Ok(()) } } impl GzDecoder { /// Returns the header associated with this stream, if it was valid pub fn header(&self) -> Option<&GzHeader> { - self.header.as_ref().and_then(|h| h.as_ref().ok()) + match self.inner { + Inner::Body(_) | Inner::Finished { .. } => Some(&self.header), + _ => None + } } /// Acquires a reference to the underlying reader. pub fn get_ref(&self) -> &R { - self.inner.get_ref().get_ref() + match &self.inner { + Inner::Header { reader, .. } => reader.get_ref(), + Inner::Body(reader) | Inner::Finished { reader, .. } => reader.get_ref().get_ref(), + _ => unreachable!() + } } /// Acquires a mutable reference to the underlying stream. @@ -350,34 +499,90 @@ impl GzDecoder { /// Note that mutation of the stream may result in surprising results if /// this encoder is continued to be used. pub fn get_mut(&mut self) -> &mut R { - self.inner.get_mut().get_mut() + match &mut self.inner { + Inner::Header { reader, .. } => reader.get_mut(), + Inner::Body(reader) | Inner::Finished { reader, .. } => reader.get_mut().get_mut(), + _ => unreachable!() + } } /// Consumes this decoder, returning the underlying reader. pub fn into_inner(self) -> R { - self.inner.into_inner().into_inner() + match self.inner { + Inner::Header { reader, .. } => reader.into_inner(), + Inner::Body(reader) | Inner::Finished { reader, .. } => reader.into_inner().into_inner(), + _ => unreachable!() + } } } impl Read for GzDecoder { fn read(&mut self, into: &mut [u8]) -> io::Result { - match self.header { - None => return Ok(0), // error already returned, - Some(Ok(_)) => {} - Some(Err(_)) => match self.header.take().unwrap() { - Ok(_) => panic!(), - Err(e) => return Err(e), - }, - } - if into.is_empty() { - return Ok(0); + let GzDecoder { inner, header } = self; + + enum Next { + None, + Body, + Finished } - match self.inner.read(into)? { - 0 => { - self.finish()?; - Ok(0) + + let mut next = Next::None; + + loop { + match inner { + Inner::Header { reader, state, flag } => { + read_gz_header2(reader, state, header, flag)?; + next = Next::Body; + }, + Inner::Body(reader) => { + if into.is_empty() { + return Ok(0); + } + + match reader.read(into)? { + 0 => next = Next::Finished, + n => return Ok(n), + } + }, + Inner::Finished { reader, pos, buf } => match buf.len().cmp(pos) { + cmp::Ordering::Greater => { + let len = reader.get_mut().get_mut().read(&mut buf[*pos..]) + .and_then(|len| if len != 0 { + Ok(len) + } else { + Err(io::ErrorKind::UnexpectedEof.into()) + })?; + *pos += len; + }, + cmp::Ordering::Equal => { + *pos += 1; + + let (crc, amt) = finish(buf); + if crc != reader.crc().sum() { + return Err(corrupt()); + } + if amt != reader.crc().amount() { + return Err(corrupt()); + } + return Ok(0); + }, + cmp::Ordering::Less => return Ok(0) + }, + Inner::None => () // unreachable } - n => Ok(n), + + match next { + Next::Body => if let Inner::Header { reader, .. } = mem::replace(inner, Inner::None) { + let reader = deflate::bufread::DeflateDecoder::new(reader.into_inner()); + *inner = Inner::Body(CrcReader::new(reader)); + }, + Next::Finished => if let Inner::Body(reader) = mem::replace(inner, Inner::None) { + *inner = Inner::Finished { reader, pos: 0, buf: [0; 8] }; + }, + Next::None => () + } + + next = Next::None; } } } diff --git a/src/gz/mod.rs b/src/gz/mod.rs index ee16b0087..6dead9c82 100644 --- a/src/gz/mod.rs +++ b/src/gz/mod.rs @@ -18,7 +18,7 @@ pub mod write; /// /// The header can contain metadata about the file that was compressed, if /// present. -#[derive(PartialEq, Clone, Debug)] +#[derive(PartialEq, Clone, Debug, Default)] pub struct GzHeader { extra: Option>, filename: Option>, @@ -316,11 +316,11 @@ mod tests { .extra(vec![0, 1, 2, 3]) .read(&r[..], Compression::default()); let mut d = read::GzDecoder::new(e); - assert_eq!(d.header().unwrap().filename(), Some(&b"foo.rs"[..])); - assert_eq!(d.header().unwrap().comment(), Some(&b"bar"[..])); - assert_eq!(d.header().unwrap().extra(), Some(&b"\x00\x01\x02\x03"[..])); let mut res = Vec::new(); d.read_to_end(&mut res).unwrap(); + assert_eq!(d.header().unwrap().filename(), Some(&b"foo.rs"[..])); // must read first + assert_eq!(d.header().unwrap().comment(), Some(&b"bar"[..])); + assert_eq!(d.header().unwrap().extra(), Some(&b"\x00\x01\x02\x03"[..])); assert_eq!(res, vec![0, 2, 4, 6]); } diff --git a/src/lib.rs b/src/lib.rs index 33836ed99..faa11c94e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -74,8 +74,8 @@ //! time to perform I/O. If I/O streams are flushed before drop, however, then //! these operations will be a noop. #![doc(html_root_url = "https://docs.rs/flate2/0.2")] -#![deny(missing_docs)] -#![deny(missing_debug_implementations)] +// #![deny(missing_docs)] +// #![deny(missing_debug_implementations)] #![allow(trivial_numeric_casts)] #![cfg_attr(test, deny(warnings))] diff --git a/tests/gunzip.rs b/tests/gunzip.rs index 855c620b0..72ce59517 100644 --- a/tests/gunzip.rs +++ b/tests/gunzip.rs @@ -73,5 +73,5 @@ fn empty_error_once() { let reader = BufReader::new(cbjson); let mut stream = reader.lines(); assert!(stream.next().unwrap().is_err()); - assert!(stream.next().is_none()); + // assert!(stream.next().is_none()); } From f17ea5b0fac4cad3047abcaa9c23d1f12b865e2e Mon Sep 17 00:00:00 2001 From: quininer Date: Thu, 14 Feb 2019 15:40:33 +0800 Subject: [PATCH 02/12] add async reader test --- Cargo.toml | 1 + tests/async-reader.rs | 78 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+) create mode 100644 tests/async-reader.rs diff --git a/Cargo.toml b/Cargo.toml index 44f7a1067..8b0a28091 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ quickcheck = { version = "0.7", default-features = false } tokio-io = "0.1.11" tokio-tcp = "0.1.3" tokio-threadpool = "0.1.10" +futures = "0.1" [features] default = ["miniz-sys"] diff --git a/tests/async-reader.rs b/tests/async-reader.rs new file mode 100644 index 000000000..48e71d3fc --- /dev/null +++ b/tests/async-reader.rs @@ -0,0 +1,78 @@ +extern crate flate2; +extern crate tokio_io; +extern crate futures; + +use flate2::read::GzDecoder; +use std::cmp; +use std::fs::File; +use std::io::{self, Read}; +use tokio_io::AsyncRead; +use tokio_io::io::read_to_end; +use futures::prelude::*; +use futures::task; + + +struct BadReader { + reader: T, + x: bool +} + +impl BadReader { + fn new(reader: T) -> BadReader { + BadReader { reader, x: true } + } +} + +impl Read for BadReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + if self.x { + self.x = false; + let len = cmp::min(buf.len(), 1); + self.reader.read(&mut buf[..len]) + } else { + self.x = true; + Err(io::ErrorKind::WouldBlock.into()) + } + } +} + +struct AssertAsync(T); + +impl Read for AssertAsync { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.0.read(buf) + } +} + +impl AsyncRead for AssertAsync {} + +struct AlwaysNotify(T); + +impl Future for AlwaysNotify { + type Item = T::Item; + type Error = T::Error; + + fn poll(&mut self) -> Poll { + let ret = self.0.poll(); + if let Ok(Async::NotReady) = &ret { + task::current().notify(); + } + ret + } +} + +#[test] +fn test_gz_asyncread() { + let f = File::open("tests/good-file.gz").unwrap(); + + let fut = read_to_end(AssertAsync(GzDecoder::new(BadReader::new(f))), Vec::new()); + let (_, content) = AlwaysNotify(fut).wait().unwrap(); + + let mut expected = Vec::new(); + File::open("tests/good-file.txt") + .unwrap() + .read_to_end(&mut expected) + .unwrap(); + + assert!(content == expected); +} From 844830cc5c36312a1b41f8fe56f83cb416cd899c Mon Sep 17 00:00:00 2001 From: quininer Date: Fri, 15 Feb 2019 13:28:11 +0800 Subject: [PATCH 03/12] keep old behavior --- src/gz/bufread.rs | 228 +++++++++++++++++++++--------------------- src/gz/mod.rs | 6 +- src/gz/read.rs | 7 ++ tests/async-reader.rs | 4 +- tests/gunzip.rs | 2 +- 5 files changed, 127 insertions(+), 120 deletions(-) diff --git a/src/gz/bufread.rs b/src/gz/bufread.rs index aceeb089e..980ea5d9e 100644 --- a/src/gz/bufread.rs +++ b/src/gz/bufread.rs @@ -2,6 +2,7 @@ use std::cmp; use std::io; use std::io::prelude::*; use std::mem; +use crc32fast::Hasher; use super::{GzBuilder, GzHeader}; use super::{FCOMMENT, FEXTRA, FHCRC, FNAME}; @@ -113,7 +114,7 @@ pub(crate) fn read_gz_header(r: &mut R) -> io::Result { } #[derive(Debug)] -pub(crate) enum State { +pub(crate) enum GzHeaderState { Header(usize, [u8; 10]), // pos, buf ExtraLen(usize, [u8; 2]), // pos, buf Extra(usize), // pos @@ -122,7 +123,13 @@ pub(crate) enum State { Crc(u16, usize, [u8; 2]) // crc, pos, buf } -pub(crate) fn read_gz_header2(r: &mut CrcReader, state: &mut State, header: &mut GzHeader, flag: &mut u8) -> io::Result<()> { +pub(crate) fn read_gz_header2( + r: &mut R, + state: &mut GzHeaderState, + header: &mut GzHeader, + flag: &mut u8, + hasher: &mut Hasher +) -> io::Result<()> { enum Next { None, ExtraLen, @@ -136,7 +143,7 @@ pub(crate) fn read_gz_header2(r: &mut CrcReader, state: &mut State, loop { match state { - State::Header(pos, buf) => if *pos < buf.len() { + GzHeaderState::Header(pos, buf) => if *pos < buf.len() { let len = r.read(&mut buf[*pos..]) .and_then(|len| if len != 0 { Ok(len) @@ -145,6 +152,8 @@ pub(crate) fn read_gz_header2(r: &mut CrcReader, state: &mut State, })?; *pos += len; } else { + hasher.update(buf); + let id1 = buf[0]; let id2 = buf[1]; if id1 != 0x1f || id2 != 0x8b { @@ -169,8 +178,8 @@ pub(crate) fn read_gz_header2(r: &mut CrcReader, state: &mut State, next = Next::ExtraLen; }, - State::ExtraLen(..) if *flag & FEXTRA == 0 => next = Next::FileName, - State::ExtraLen(pos, buf) => if *pos < buf.len() { + GzHeaderState::ExtraLen(..) if *flag & FEXTRA == 0 => next = Next::FileName, + GzHeaderState::ExtraLen(pos, buf) => if *pos < buf.len() { let len = r.read(&mut buf[*pos..]) .and_then(|len| if len != 0 { Ok(len) @@ -179,6 +188,8 @@ pub(crate) fn read_gz_header2(r: &mut CrcReader, state: &mut State, })?; *pos += len; } else { + hasher.update(buf); + let xlen = (buf[0] as u16) | ((buf[1] as u16) << 8); header.extra = Some(vec![0; xlen as usize]); if xlen != 0 { @@ -187,7 +198,7 @@ pub(crate) fn read_gz_header2(r: &mut CrcReader, state: &mut State, next = Next::FileName; } }, - State::Extra(pos) => if let Some(extra) = &mut header.extra { + GzHeaderState::Extra(pos) => if let Some(extra) = &mut header.extra { if *pos < extra.len() { let len = r.read(&mut extra[*pos..]) .and_then(|len| if len != 0 { @@ -199,37 +210,25 @@ pub(crate) fn read_gz_header2(r: &mut CrcReader, state: &mut State, } else { next = Next::FileName; } - } else { - next = Next::FileName; // unreachable }, - State::FileName if *flag & FNAME == 0 => next = Next::Comment, - State::FileName => { + GzHeaderState::FileName if *flag & FNAME == 0 => next = Next::Comment, + GzHeaderState::FileName => { let filename = header.filename.get_or_insert_with(Vec::new); - // wow this is slow - for byte in r.by_ref().bytes() { - let byte = byte?; - if byte == 0 { - break; - } - filename.push(byte); - } + r.read_until(0, filename)?; + hasher.update(filename); + filename.pop(); // pop 0 next = Next::Comment; }, - State::Comment if *flag & FCOMMENT == 0 => next = Next::Crc, - State::Comment => { + GzHeaderState::Comment if *flag & FCOMMENT == 0 => next = Next::Crc, + GzHeaderState::Comment => { let comment = header.comment.get_or_insert_with(Vec::new); - // wow this is slow - for byte in r.by_ref().bytes() { - let byte = byte?; - if byte == 0 { - break; - } - comment.push(byte); - } + r.read_until(0, comment)?; + hasher.update(comment); + comment.pop(); // pop 0 next = Next::Crc }, - State::Crc(..) if *flag & FHCRC == 0 => return Ok(()), - State::Crc(calced_crc, pos, buf) => if *pos < buf.len() { + GzHeaderState::Crc(..) if *flag & FHCRC == 0 => return Ok(()), + GzHeaderState::Crc(calced_crc, pos, buf) => if *pos < buf.len() { let len = r.read(&mut buf[*pos..]) .and_then(|len| if len != 0 { Ok(len) @@ -247,16 +246,14 @@ pub(crate) fn read_gz_header2(r: &mut CrcReader, state: &mut State, } }; - match next { - Next::ExtraLen => *state = State::ExtraLen(0, [0; 2]), - Next::Extra => *state = State::Extra(0), - Next::FileName => *state = State::FileName, - Next::Comment => *state = State::Comment, - Next::Crc => *state = State::Crc(r.crc().sum() as u16, 0, [0; 2]), + match mem::replace(&mut next, Next::None) { + Next::ExtraLen => *state = GzHeaderState::ExtraLen(0, [0; 2]), + Next::Extra => *state = GzHeaderState::Extra(0), + Next::FileName => *state = GzHeaderState::FileName, + Next::Comment => *state = GzHeaderState::Comment, + Next::Crc => *state = GzHeaderState::Crc(hasher.clone().finalize() as u16, 0, [0; 2]), Next::None => () } - - next = Next::None; } } @@ -441,37 +438,55 @@ impl Write for GzEncoder { /// ``` #[derive(Debug)] pub struct GzDecoder { - inner: Inner, - header: GzHeader + inner: GzState, + header: GzHeader, + reader: CrcReader> } #[derive(Debug)] -enum Inner { +enum GzState { Header { - reader: CrcReader, - state: State, - flag: u8 + state: GzHeaderState, + flag: u8, + hasher: Hasher }, - Body(CrcReader>), - Finished { - reader: CrcReader>, - pos: usize, - buf: [u8; 8] - }, - None + Body, + Finished(usize, [u8; 8]), + Err(io::Error), + End } impl GzDecoder { /// Creates a new decoder from the given reader, immediately parsing the /// gzip header. - pub fn new(r: R) -> GzDecoder { + pub fn new(mut r: R) -> GzDecoder { + let mut state = GzHeaderState::Header(0, [0; 10]); + let mut header = GzHeader::default(); + let mut flag = 0; + let mut hasher = Hasher::new(); + let result = read_gz_header2(&mut r, &mut state, &mut header, &mut flag, &mut hasher); + GzDecoder { - inner: Inner::Header { - reader: CrcReader::new(r), - state: State::Header(0, [0; 10]), - flag: 0 + inner: if let Err(err) = result { + GzState::Err(err) + } else { + GzState::Body + }, + reader: CrcReader::new(deflate::bufread::DeflateDecoder::new(r)), + header + } + } + + /// Creates a new decoder from the given reader. + pub fn new2(r: R) -> GzDecoder { + GzDecoder { + inner: GzState::Header { + state: GzHeaderState::Header(0, [0; 10]), + flag: 0, + hasher: Hasher::new() }, header: GzHeader::default(), + reader: CrcReader::new(deflate::bufread::DeflateDecoder::new(r)) } } } @@ -480,18 +495,14 @@ impl GzDecoder { /// Returns the header associated with this stream, if it was valid pub fn header(&self) -> Option<&GzHeader> { match self.inner { - Inner::Body(_) | Inner::Finished { .. } => Some(&self.header), - _ => None + GzState::Err(_) | GzState::Header { .. } => None, + _ => Some(&self.header) } } /// Acquires a reference to the underlying reader. pub fn get_ref(&self) -> &R { - match &self.inner { - Inner::Header { reader, .. } => reader.get_ref(), - Inner::Body(reader) | Inner::Finished { reader, .. } => reader.get_ref().get_ref(), - _ => unreachable!() - } + self.reader.get_ref().get_ref() } /// Acquires a mutable reference to the underlying stream. @@ -499,90 +510,79 @@ impl GzDecoder { /// Note that mutation of the stream may result in surprising results if /// this encoder is continued to be used. pub fn get_mut(&mut self) -> &mut R { - match &mut self.inner { - Inner::Header { reader, .. } => reader.get_mut(), - Inner::Body(reader) | Inner::Finished { reader, .. } => reader.get_mut().get_mut(), - _ => unreachable!() - } + self.reader.get_mut().get_mut() } /// Consumes this decoder, returning the underlying reader. pub fn into_inner(self) -> R { - match self.inner { - Inner::Header { reader, .. } => reader.into_inner(), - Inner::Body(reader) | Inner::Finished { reader, .. } => reader.into_inner().into_inner(), - _ => unreachable!() - } + self.reader.into_inner().into_inner() } } impl Read for GzDecoder { fn read(&mut self, into: &mut [u8]) -> io::Result { - let GzDecoder { inner, header } = self; + let GzDecoder { inner, header, reader } = self; enum Next { None, Body, - Finished + Finished, + Err(io::Error), + End } let mut next = Next::None; loop { match inner { - Inner::Header { reader, state, flag } => { - read_gz_header2(reader, state, header, flag)?; - next = Next::Body; + GzState::Header { state, flag, hasher } => { + match read_gz_header2(reader.get_mut().get_mut(), state, header, flag, hasher) { + Ok(_) => next = Next::Body, + Err(ref err) if io::ErrorKind::WouldBlock == err.kind() => (), + Err(err) => next = Next::Err(err) + } }, - Inner::Body(reader) => { + GzState::Body => { if into.is_empty() { return Ok(0); } match reader.read(into)? { 0 => next = Next::Finished, - n => return Ok(n), + n => return Ok(n) } }, - Inner::Finished { reader, pos, buf } => match buf.len().cmp(pos) { - cmp::Ordering::Greater => { - let len = reader.get_mut().get_mut().read(&mut buf[*pos..]) - .and_then(|len| if len != 0 { - Ok(len) - } else { - Err(io::ErrorKind::UnexpectedEof.into()) - })?; - *pos += len; - }, - cmp::Ordering::Equal => { - *pos += 1; - - let (crc, amt) = finish(buf); - if crc != reader.crc().sum() { - return Err(corrupt()); - } - if amt != reader.crc().amount() { - return Err(corrupt()); - } - return Ok(0); - }, - cmp::Ordering::Less => return Ok(0) + GzState::Finished(pos, buf) => if *pos < buf.len() { + match reader.get_mut().get_mut().read(&mut buf[*pos..]) { + Ok(0) => next = Next::Err(io::ErrorKind::UnexpectedEof.into()), + Ok(n) => *pos += n, + Err(err) => return Err(err) + } + } else { + let (crc, amt) = finish(buf); + if crc != reader.crc().sum() { + return Err(corrupt()); + } + if amt != reader.crc().amount() { + return Err(corrupt()); + } + + next = Next::End; }, - Inner::None => () // unreachable + GzState::Err(err) => next = Next::Err(mem::replace(err, io::ErrorKind::Other.into())), + GzState::End => return Ok(0) } - match next { - Next::Body => if let Inner::Header { reader, .. } = mem::replace(inner, Inner::None) { - let reader = deflate::bufread::DeflateDecoder::new(reader.into_inner()); - *inner = Inner::Body(CrcReader::new(reader)); - }, - Next::Finished => if let Inner::Body(reader) = mem::replace(inner, Inner::None) { - *inner = Inner::Finished { reader, pos: 0, buf: [0; 8] }; + match mem::replace(&mut next, Next::None) { + Next::None => (), + Next::Body => *inner = GzState::Body, + Next::Finished => *inner = GzState::Finished(0, [0; 8]), + Next::Err(err) => { + *inner = GzState::End; + return Err(err); }, - Next::None => () + Next::End => *inner = GzState::End } - - next = Next::None; } } } diff --git a/src/gz/mod.rs b/src/gz/mod.rs index 6dead9c82..b9043b7d3 100644 --- a/src/gz/mod.rs +++ b/src/gz/mod.rs @@ -316,11 +316,11 @@ mod tests { .extra(vec![0, 1, 2, 3]) .read(&r[..], Compression::default()); let mut d = read::GzDecoder::new(e); - let mut res = Vec::new(); - d.read_to_end(&mut res).unwrap(); - assert_eq!(d.header().unwrap().filename(), Some(&b"foo.rs"[..])); // must read first + assert_eq!(d.header().unwrap().filename(), Some(&b"foo.rs"[..])); assert_eq!(d.header().unwrap().comment(), Some(&b"bar"[..])); assert_eq!(d.header().unwrap().extra(), Some(&b"\x00\x01\x02\x03"[..])); + let mut res = Vec::new(); + d.read_to_end(&mut res).unwrap(); assert_eq!(res, vec![0, 2, 4, 6]); } diff --git a/src/gz/read.rs b/src/gz/read.rs index a73a54699..288c63bfd 100644 --- a/src/gz/read.rs +++ b/src/gz/read.rs @@ -137,6 +137,13 @@ impl GzDecoder { inner: bufread::GzDecoder::new(BufReader::new(r)), } } + + /// Creates a new decoder from the given reader. + pub fn new2(r: R) -> GzDecoder { + GzDecoder { + inner: bufread::GzDecoder::new2(BufReader::new(r)), + } + } } impl GzDecoder { diff --git a/tests/async-reader.rs b/tests/async-reader.rs index 48e71d3fc..efef619c7 100644 --- a/tests/async-reader.rs +++ b/tests/async-reader.rs @@ -65,7 +65,7 @@ impl Future for AlwaysNotify { fn test_gz_asyncread() { let f = File::open("tests/good-file.gz").unwrap(); - let fut = read_to_end(AssertAsync(GzDecoder::new(BadReader::new(f))), Vec::new()); + let fut = read_to_end(AssertAsync(GzDecoder::new2(BadReader::new(f))), Vec::new()); let (_, content) = AlwaysNotify(fut).wait().unwrap(); let mut expected = Vec::new(); @@ -74,5 +74,5 @@ fn test_gz_asyncread() { .read_to_end(&mut expected) .unwrap(); - assert!(content == expected); + assert_eq!(content, expected); } diff --git a/tests/gunzip.rs b/tests/gunzip.rs index 72ce59517..855c620b0 100644 --- a/tests/gunzip.rs +++ b/tests/gunzip.rs @@ -73,5 +73,5 @@ fn empty_error_once() { let reader = BufReader::new(cbjson); let mut stream = reader.lines(); assert!(stream.next().unwrap().is_err()); - // assert!(stream.next().is_none()); + assert!(stream.next().is_none()); } From 65125391d66566c026b24591125f4e33113d8bf4 Mon Sep 17 00:00:00 2001 From: quininer Date: Fri, 15 Feb 2019 13:35:02 +0800 Subject: [PATCH 04/12] fix wouldblock --- src/gz/bufread.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/gz/bufread.rs b/src/gz/bufread.rs index 980ea5d9e..edc7f2e25 100644 --- a/src/gz/bufread.rs +++ b/src/gz/bufread.rs @@ -538,8 +538,11 @@ impl Read for GzDecoder { GzState::Header { state, flag, hasher } => { match read_gz_header2(reader.get_mut().get_mut(), state, header, flag, hasher) { Ok(_) => next = Next::Body, - Err(ref err) if io::ErrorKind::WouldBlock == err.kind() => (), - Err(err) => next = Next::Err(err) + Err(err) => if io::ErrorKind::WouldBlock == err.kind() { + return Err(err); + } else { + next = Next::Err(err); + } } }, GzState::Body => { @@ -560,14 +563,14 @@ impl Read for GzDecoder { } } else { let (crc, amt) = finish(buf); + if crc != reader.crc().sum() { - return Err(corrupt()); - } - if amt != reader.crc().amount() { - return Err(corrupt()); + next = Next::Err(corrupt()); + } else if amt != reader.crc().amount() { + next = Next::Err(corrupt()); + } else { + next = Next::End; } - - next = Next::End; }, GzState::Err(err) => next = Next::Err(mem::replace(err, io::ErrorKind::Other.into())), GzState::End => return Ok(0) From 9dc0d9eebfe6f6528de8cc940c1fa19d700da2dc Mon Sep 17 00:00:00 2001 From: quininer Date: Fri, 15 Feb 2019 15:24:08 +0800 Subject: [PATCH 05/12] merge MultiGzDecoder --- src/gz/bufread.rs | 130 ++++++++++++++++-------------------------- src/gz/read.rs | 14 +++++ tests/async-reader.rs | 16 ++++++ 3 files changed, 78 insertions(+), 82 deletions(-) diff --git a/src/gz/bufread.rs b/src/gz/bufread.rs index edc7f2e25..4e728e837 100644 --- a/src/gz/bufread.rs +++ b/src/gz/bufread.rs @@ -440,7 +440,8 @@ impl Write for GzEncoder { pub struct GzDecoder { inner: GzState, header: GzHeader, - reader: CrcReader> + reader: CrcReader>, + multi: bool } #[derive(Debug)] @@ -467,6 +468,7 @@ impl GzDecoder { let result = read_gz_header2(&mut r, &mut state, &mut header, &mut flag, &mut hasher); GzDecoder { + multi: false, inner: if let Err(err) = result { GzState::Err(err) } else { @@ -486,9 +488,15 @@ impl GzDecoder { hasher: Hasher::new() }, header: GzHeader::default(), - reader: CrcReader::new(deflate::bufread::DeflateDecoder::new(r)) + reader: CrcReader::new(deflate::bufread::DeflateDecoder::new(r)), + multi: false } } + + pub fn multi(mut self, flag: bool) -> GzDecoder { + self.multi = flag; + self + } } impl GzDecoder { @@ -521,10 +529,11 @@ impl GzDecoder { impl Read for GzDecoder { fn read(&mut self, into: &mut [u8]) -> io::Result { - let GzDecoder { inner, header, reader } = self; + let GzDecoder { inner, header, reader, multi } = self; enum Next { None, + Header, Body, Finished, Err(io::Error), @@ -568,8 +577,21 @@ impl Read for GzDecoder { next = Next::Err(corrupt()); } else if amt != reader.crc().amount() { next = Next::Err(corrupt()); - } else { + } else if !*multi { next = Next::End; + } else { + match reader.get_mut().get_mut().fill_buf() { + Ok(buf) => if buf.is_empty() { + next = Next::End; + } else { + next = Next::Header; + }, + Err(err) => if io::ErrorKind::WouldBlock == err.kind() { + return Err(err); + } else { + next = Next::Err(err); + } + } } }, GzState::Err(err) => next = Next::Err(mem::replace(err, io::ErrorKind::Other.into())), @@ -578,6 +600,16 @@ impl Read for GzDecoder { match mem::replace(&mut next, Next::None) { Next::None => (), + Next::Header => { + reader.reset(); + reader.get_mut().reset_data(); + *header = GzHeader::default(); + *inner = GzState::Header { + state: GzHeaderState::Header(0, [0; 10]), + flag: 0, + hasher: Hasher::new() + }; + }, Next::Body => *inner = GzState::Body, Next::Finished => *inner = GzState::Finished(0, [0; 8]), Next::Err(err) => { @@ -641,87 +673,32 @@ impl Write for GzDecoder { /// } /// ``` #[derive(Debug)] -pub struct MultiGzDecoder { - inner: CrcReader>, - header: io::Result, - finished: bool, -} +pub struct MultiGzDecoder(GzDecoder); impl MultiGzDecoder { /// Creates a new decoder from the given reader, immediately parsing the /// (first) gzip header. If the gzip stream contains multiple members all will /// be decoded. - pub fn new(mut r: R) -> MultiGzDecoder { - let header = read_gz_header(&mut r); - - let flate = deflate::bufread::DeflateDecoder::new(r); - MultiGzDecoder { - inner: CrcReader::new(flate), - header: header, - finished: false, - } + pub fn new(r: R) -> MultiGzDecoder { + MultiGzDecoder(GzDecoder::new(r).multi(true)) } - fn finish_member(&mut self) -> io::Result { - if self.finished { - return Ok(0); - } - let ref mut buf = [0u8; 8]; - { - let mut len = 0; - - while len < buf.len() { - match self.inner.get_mut().get_mut().read(&mut buf[len..])? { - 0 => return Err(corrupt()), - n => len += n, - } - } - } - - let crc = ((buf[0] as u32) << 0) - | ((buf[1] as u32) << 8) - | ((buf[2] as u32) << 16) - | ((buf[3] as u32) << 24); - let amt = ((buf[4] as u32) << 0) - | ((buf[5] as u32) << 8) - | ((buf[6] as u32) << 16) - | ((buf[7] as u32) << 24); - if crc != self.inner.crc().sum() as u32 { - return Err(corrupt()); - } - if amt != self.inner.crc().amount() { - return Err(corrupt()); - } - let remaining = match self.inner.get_mut().get_mut().fill_buf() { - Ok(b) => { - if b.is_empty() { - self.finished = true; - return Ok(0); - } else { - b.len() - } - } - Err(e) => return Err(e), - }; - - let next_header = read_gz_header(self.inner.get_mut().get_mut()); - drop(mem::replace(&mut self.header, next_header)); - self.inner.reset(); - self.inner.get_mut().reset_data(); - - Ok(remaining) + /// Creates a new decoder from the given reader. + /// If the gzip stream contains multiple members all will be decoded. + pub fn new2(r: R) -> MultiGzDecoder { + MultiGzDecoder(GzDecoder::new2(r).multi(true)) } } impl MultiGzDecoder { /// Returns the current header associated with this stream, if it's valid pub fn header(&self) -> Option<&GzHeader> { - self.header.as_ref().ok() + self.0.header() } /// Acquires a reference to the underlying reader. pub fn get_ref(&self) -> &R { - self.inner.get_ref().get_ref() + self.0.get_ref() } /// Acquires a mutable reference to the underlying stream. @@ -729,29 +706,18 @@ impl MultiGzDecoder { /// Note that mutation of the stream may result in surprising results if /// this encoder is continued to be used. pub fn get_mut(&mut self) -> &mut R { - self.inner.get_mut().get_mut() + self.0.get_mut() } /// Consumes this decoder, returning the underlying reader. pub fn into_inner(self) -> R { - self.inner.into_inner().into_inner() + self.0.into_inner() } } impl Read for MultiGzDecoder { fn read(&mut self, into: &mut [u8]) -> io::Result { - if let Err(ref mut e) = self.header { - let another_error = io::ErrorKind::Other.into(); - return Err(mem::replace(e, another_error)); - } - match self.inner.read(into)? { - 0 => match self.finish_member() { - Ok(0) => Ok(0), - Ok(_) => self.read(into), - Err(e) => Err(e), - }, - n => Ok(n), - } + self.0.read(into) } } diff --git a/src/gz/read.rs b/src/gz/read.rs index 288c63bfd..32a87e297 100644 --- a/src/gz/read.rs +++ b/src/gz/read.rs @@ -144,6 +144,12 @@ impl GzDecoder { inner: bufread::GzDecoder::new2(BufReader::new(r)), } } + + pub fn multi(self, flag: bool) -> GzDecoder { + GzDecoder { + inner: self.inner.multi(flag) + } + } } impl GzDecoder { @@ -241,6 +247,14 @@ impl MultiGzDecoder { inner: bufread::MultiGzDecoder::new(BufReader::new(r)), } } + + /// Creates a new decoder from the given reader. + /// If the gzip stream contains multiple members all will be decoded. + pub fn new2(r: R) -> MultiGzDecoder { + MultiGzDecoder { + inner: bufread::MultiGzDecoder::new2(BufReader::new(r)), + } + } } impl MultiGzDecoder { diff --git a/tests/async-reader.rs b/tests/async-reader.rs index efef619c7..f9f2e27b4 100644 --- a/tests/async-reader.rs +++ b/tests/async-reader.rs @@ -76,3 +76,19 @@ fn test_gz_asyncread() { assert_eq!(content, expected); } + +#[test] +fn test_multi_gz_asyncread() { + let f = File::open("tests/multi.gz").unwrap(); + + let fut = read_to_end(AssertAsync(GzDecoder::new2(BadReader::new(f)).multi(true)), Vec::new()); + let (_, content) = AlwaysNotify(fut).wait().unwrap(); + + let mut expected = Vec::new(); + File::open("tests/multi.txt") + .unwrap() + .read_to_end(&mut expected) + .unwrap(); + + assert_eq!(content, expected); +} From c96b7b7bd2944822cf00611f2a9593c74a65c4da Mon Sep 17 00:00:00 2001 From: quininer Date: Fri, 15 Feb 2019 16:54:14 +0800 Subject: [PATCH 06/12] replace old read_gz_header --- src/gz/bufread.rs | 116 ++++++++++++---------------------------------- 1 file changed, 30 insertions(+), 86 deletions(-) diff --git a/src/gz/bufread.rs b/src/gz/bufread.rs index 4e728e837..70875d19d 100644 --- a/src/gz/bufread.rs +++ b/src/gz/bufread.rs @@ -30,91 +30,17 @@ fn bad_header() -> io::Error { io::Error::new(io::ErrorKind::InvalidInput, "invalid gzip header") } -fn read_le_u16(r: &mut R) -> io::Result { - let mut b = [0; 2]; - r.read_exact(&mut b)?; - Ok((b[0] as u16) | ((b[1] as u16) << 8)) -} - pub(crate) fn read_gz_header(r: &mut R) -> io::Result { - let mut crc_reader = CrcReader::new(r); - let mut header = [0; 10]; - crc_reader.read_exact(&mut header)?; - - let id1 = header[0]; - let id2 = header[1]; - if id1 != 0x1f || id2 != 0x8b { - return Err(bad_header()); - } - let cm = header[2]; - if cm != 8 { - return Err(bad_header()); - } - - let flg = header[3]; - let mtime = ((header[4] as u32) << 0) - | ((header[5] as u32) << 8) - | ((header[6] as u32) << 16) - | ((header[7] as u32) << 24); - let _xfl = header[8]; - let os = header[9]; - - let extra = if flg & FEXTRA != 0 { - let xlen = read_le_u16(&mut crc_reader)?; - let mut extra = vec![0; xlen as usize]; - crc_reader.read_exact(&mut extra)?; - Some(extra) - } else { - None - }; - let filename = if flg & FNAME != 0 { - // wow this is slow - let mut b = Vec::new(); - for byte in crc_reader.by_ref().bytes() { - let byte = byte?; - if byte == 0 { - break; - } - b.push(byte); - } - Some(b) - } else { - None - }; - let comment = if flg & FCOMMENT != 0 { - // wow this is slow - let mut b = Vec::new(); - for byte in crc_reader.by_ref().bytes() { - let byte = byte?; - if byte == 0 { - break; - } - b.push(byte); - } - Some(b) - } else { - None - }; - - if flg & FHCRC != 0 { - let calced_crc = crc_reader.crc().sum() as u16; - let stored_crc = read_le_u16(&mut crc_reader)?; - if calced_crc != stored_crc { - return Err(corrupt()); - } - } - - Ok(GzHeader { - extra: extra, - filename: filename, - comment: comment, - operating_system: os, - mtime: mtime, - }) + let mut state = GzHeaderState::Header(0, [0; 10]); + let mut header = GzHeader::default(); + let mut flag = 0; + let mut hasher = Hasher::new(); + read_gz_header2(r, &mut state, &mut header, &mut flag, &mut hasher) + .map(|_| header) } #[derive(Debug)] -pub(crate) enum GzHeaderState { +enum GzHeaderState { Header(usize, [u8; 10]), // pos, buf ExtraLen(usize, [u8; 2]), // pos, buf Extra(usize), // pos @@ -123,7 +49,7 @@ pub(crate) enum GzHeaderState { Crc(u16, usize, [u8; 2]) // crc, pos, buf } -pub(crate) fn read_gz_header2( +fn read_gz_header2( r: &mut R, state: &mut GzHeaderState, header: &mut GzHeader, @@ -214,17 +140,35 @@ pub(crate) fn read_gz_header2( GzHeaderState::FileName if *flag & FNAME == 0 => next = Next::Comment, GzHeaderState::FileName => { let filename = header.filename.get_or_insert_with(Vec::new); - r.read_until(0, filename)?; + + // wow this is slow + for byte in r.by_ref().bytes() { + let byte = byte?; + if byte == 0 { + break; + } + filename.push(byte); + } + hasher.update(filename); - filename.pop(); // pop 0 + hasher.update(&[0]); next = Next::Comment; }, GzHeaderState::Comment if *flag & FCOMMENT == 0 => next = Next::Crc, GzHeaderState::Comment => { let comment = header.comment.get_or_insert_with(Vec::new); - r.read_until(0, comment)?; + + // wow this is slow + for byte in r.by_ref().bytes() { + let byte = byte?; + if byte == 0 { + break; + } + comment.push(byte); + } + hasher.update(comment); - comment.pop(); // pop 0 + hasher.update(&[0]); next = Next::Crc }, GzHeaderState::Crc(..) if *flag & FHCRC == 0 => return Ok(()), From 5a312843030a273218b635b7d8208c3eec76244c Mon Sep 17 00:00:00 2001 From: quininer Date: Fri, 15 Feb 2019 17:20:51 +0800 Subject: [PATCH 07/12] impl AsyncRead for GzDecoder --- src/gz/bufread.rs | 33 +++++++++++++++++++++++++++++++-- src/gz/read.rs | 25 +++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/src/gz/bufread.rs b/src/gz/bufread.rs index 70875d19d..524dc7a9d 100644 --- a/src/gz/bufread.rs +++ b/src/gz/bufread.rs @@ -4,6 +4,11 @@ use std::io::prelude::*; use std::mem; use crc32fast::Hasher; +#[cfg(feature = "tokio")] +use futures::Poll; +#[cfg(feature = "tokio")] +use tokio_io::{AsyncRead, AsyncWrite}; + use super::{GzBuilder, GzHeader}; use super::{FCOMMENT, FEXTRA, FHCRC, FNAME}; use crc::CrcReader; @@ -412,13 +417,13 @@ impl GzDecoder { let result = read_gz_header2(&mut r, &mut state, &mut header, &mut flag, &mut hasher); GzDecoder { - multi: false, inner: if let Err(err) = result { GzState::Err(err) } else { GzState::Body }, reader: CrcReader::new(deflate::bufread::DeflateDecoder::new(r)), + multi: false, header } } @@ -512,7 +517,11 @@ impl Read for GzDecoder { match reader.get_mut().get_mut().read(&mut buf[*pos..]) { Ok(0) => next = Next::Err(io::ErrorKind::UnexpectedEof.into()), Ok(n) => *pos += n, - Err(err) => return Err(err) + Err(err) => if io::ErrorKind::WouldBlock == err.kind() { + return Err(err); + } else { + next = Next::Err(err); + } } } else { let (crc, amt) = finish(buf); @@ -566,6 +575,9 @@ impl Read for GzDecoder { } } +#[cfg(feature = "tokio")] +impl AsyncRead for GzDecoder {} + impl Write for GzDecoder { fn write(&mut self, buf: &[u8]) -> io::Result { self.get_mut().write(buf) @@ -576,6 +588,13 @@ impl Write for GzDecoder { } } +#[cfg(feature = "tokio")] +impl AsyncWrite for GzDecoder { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.get_mut().shutdown() + } +} + /// A gzip streaming decoder that decodes all members of a multistream /// /// A gzip member consists of a header, compressed data and a trailer. The [gzip @@ -665,6 +684,9 @@ impl Read for MultiGzDecoder { } } +#[cfg(feature = "tokio")] +impl AsyncRead for MultiGzDecoder {} + impl Write for MultiGzDecoder { fn write(&mut self, buf: &[u8]) -> io::Result { self.get_mut().write(buf) @@ -674,3 +696,10 @@ impl Write for MultiGzDecoder { self.get_mut().flush() } } + +#[cfg(feature = "tokio")] +impl AsyncWrite for MultiGzDecoder { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.get_mut().shutdown() + } +} diff --git a/src/gz/read.rs b/src/gz/read.rs index 32a87e297..c487e2d2d 100644 --- a/src/gz/read.rs +++ b/src/gz/read.rs @@ -1,6 +1,11 @@ use std::io; use std::io::prelude::*; +#[cfg(feature = "tokio")] +use futures::Poll; +#[cfg(feature = "tokio")] +use tokio_io::{AsyncRead, AsyncWrite}; + use super::bufread; use super::{GzBuilder, GzHeader}; use bufreader::BufReader; @@ -183,6 +188,9 @@ impl Read for GzDecoder { } } +#[cfg(feature = "tokio")] +impl AsyncRead for GzDecoder {} + impl Write for GzDecoder { fn write(&mut self, buf: &[u8]) -> io::Result { self.get_mut().write(buf) @@ -193,6 +201,13 @@ impl Write for GzDecoder { } } +#[cfg(feature = "tokio")] +impl AsyncWrite for GzDecoder { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.get_mut().shutdown() + } +} + /// A gzip streaming decoder that decodes all members of a multistream /// /// A gzip member consists of a header, compressed data and a trailer. The [gzip @@ -288,6 +303,9 @@ impl Read for MultiGzDecoder { } } +#[cfg(feature = "tokio")] +impl AsyncRead for MultiGzDecoder {} + impl Write for MultiGzDecoder { fn write(&mut self, buf: &[u8]) -> io::Result { self.get_mut().write(buf) @@ -297,3 +315,10 @@ impl Write for MultiGzDecoder { self.get_mut().flush() } } + +#[cfg(feature = "tokio")] +impl AsyncWrite for MultiGzDecoder { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.get_mut().shutdown() + } +} From ecd46e13798b1e03ab0ed4fc7c6ceb6f33037861 Mon Sep 17 00:00:00 2001 From: quininer Date: Fri, 15 Feb 2019 17:27:29 +0800 Subject: [PATCH 08/12] Don't expose multi method --- src/gz/bufread.rs | 2 +- src/gz/read.rs | 6 ------ src/lib.rs | 4 ++-- tests/async-reader.rs | 4 ++-- 4 files changed, 5 insertions(+), 11 deletions(-) diff --git a/src/gz/bufread.rs b/src/gz/bufread.rs index 524dc7a9d..6709e4cfe 100644 --- a/src/gz/bufread.rs +++ b/src/gz/bufread.rs @@ -442,7 +442,7 @@ impl GzDecoder { } } - pub fn multi(mut self, flag: bool) -> GzDecoder { + fn multi(mut self, flag: bool) -> GzDecoder { self.multi = flag; self } diff --git a/src/gz/read.rs b/src/gz/read.rs index c487e2d2d..7481dd19b 100644 --- a/src/gz/read.rs +++ b/src/gz/read.rs @@ -149,12 +149,6 @@ impl GzDecoder { inner: bufread::GzDecoder::new2(BufReader::new(r)), } } - - pub fn multi(self, flag: bool) -> GzDecoder { - GzDecoder { - inner: self.inner.multi(flag) - } - } } impl GzDecoder { diff --git a/src/lib.rs b/src/lib.rs index faa11c94e..33836ed99 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -74,8 +74,8 @@ //! time to perform I/O. If I/O streams are flushed before drop, however, then //! these operations will be a noop. #![doc(html_root_url = "https://docs.rs/flate2/0.2")] -// #![deny(missing_docs)] -// #![deny(missing_debug_implementations)] +#![deny(missing_docs)] +#![deny(missing_debug_implementations)] #![allow(trivial_numeric_casts)] #![cfg_attr(test, deny(warnings))] diff --git a/tests/async-reader.rs b/tests/async-reader.rs index f9f2e27b4..0f90fd6a9 100644 --- a/tests/async-reader.rs +++ b/tests/async-reader.rs @@ -2,7 +2,7 @@ extern crate flate2; extern crate tokio_io; extern crate futures; -use flate2::read::GzDecoder; +use flate2::read::{GzDecoder, MultiGzDecoder}; use std::cmp; use std::fs::File; use std::io::{self, Read}; @@ -81,7 +81,7 @@ fn test_gz_asyncread() { fn test_multi_gz_asyncread() { let f = File::open("tests/multi.gz").unwrap(); - let fut = read_to_end(AssertAsync(GzDecoder::new2(BadReader::new(f)).multi(true)), Vec::new()); + let fut = read_to_end(AssertAsync(MultiGzDecoder::new2(BadReader::new(f))), Vec::new()); let (_, content) = AlwaysNotify(fut).wait().unwrap(); let mut expected = Vec::new(); From 4a9a7615e49855c9860f3fc119849e64ddca7b87 Mon Sep 17 00:00:00 2001 From: quininer Date: Fri, 15 Feb 2019 18:21:02 +0800 Subject: [PATCH 09/12] fix gzheader crc --- src/gz/bufread.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/gz/bufread.rs b/src/gz/bufread.rs index 6709e4cfe..e17e73c39 100644 --- a/src/gz/bufread.rs +++ b/src/gz/bufread.rs @@ -139,6 +139,7 @@ fn read_gz_header2( })?; *pos += len; } else { + hasher.update(extra); next = Next::FileName; } }, @@ -149,14 +150,13 @@ fn read_gz_header2( // wow this is slow for byte in r.by_ref().bytes() { let byte = byte?; + hasher.update(&[byte]); if byte == 0 { break; } filename.push(byte); } - hasher.update(filename); - hasher.update(&[0]); next = Next::Comment; }, GzHeaderState::Comment if *flag & FCOMMENT == 0 => next = Next::Crc, @@ -166,14 +166,13 @@ fn read_gz_header2( // wow this is slow for byte in r.by_ref().bytes() { let byte = byte?; + hasher.update(&[byte]); if byte == 0 { break; } comment.push(byte); } - hasher.update(comment); - hasher.update(&[0]); next = Next::Crc }, GzHeaderState::Crc(..) if *flag & FHCRC == 0 => return Ok(()), From b6eed5d09b84f188f619070b4481824e6840ec4a Mon Sep 17 00:00:00 2001 From: quininer Date: Fri, 22 Feb 2019 21:43:42 +0800 Subject: [PATCH 10/12] use buffer --- src/gz/bufread.rs | 356 +++++++++++++++++------------------------- src/gz/read.rs | 15 -- tests/async-reader.rs | 4 +- 3 files changed, 141 insertions(+), 234 deletions(-) diff --git a/src/gz/bufread.rs b/src/gz/bufread.rs index e17e73c39..f76d0d08d 100644 --- a/src/gz/bufread.rs +++ b/src/gz/bufread.rs @@ -2,7 +2,6 @@ use std::cmp; use std::io; use std::io::prelude::*; use std::mem; -use crc32fast::Hasher; #[cfg(feature = "tokio")] use futures::Poll; @@ -35,174 +34,87 @@ fn bad_header() -> io::Error { io::Error::new(io::ErrorKind::InvalidInput, "invalid gzip header") } -pub(crate) fn read_gz_header(r: &mut R) -> io::Result { - let mut state = GzHeaderState::Header(0, [0; 10]); - let mut header = GzHeader::default(); - let mut flag = 0; - let mut hasher = Hasher::new(); - read_gz_header2(r, &mut state, &mut header, &mut flag, &mut hasher) - .map(|_| header) +fn read_le_u16(r: &mut R) -> io::Result { + let mut b = [0; 2]; + r.read_exact(&mut b)?; + Ok((b[0] as u16) | ((b[1] as u16) << 8)) } -#[derive(Debug)] -enum GzHeaderState { - Header(usize, [u8; 10]), // pos, buf - ExtraLen(usize, [u8; 2]), // pos, buf - Extra(usize), // pos - FileName, - Comment, - Crc(u16, usize, [u8; 2]) // crc, pos, buf -} - -fn read_gz_header2( - r: &mut R, - state: &mut GzHeaderState, - header: &mut GzHeader, - flag: &mut u8, - hasher: &mut Hasher -) -> io::Result<()> { - enum Next { - None, - ExtraLen, - Extra, - FileName, - Comment, - Crc - } - - let mut next = Next::None; - - loop { - match state { - GzHeaderState::Header(pos, buf) => if *pos < buf.len() { - let len = r.read(&mut buf[*pos..]) - .and_then(|len| if len != 0 { - Ok(len) - } else { - Err(io::ErrorKind::UnexpectedEof.into()) - })?; - *pos += len; - } else { - hasher.update(buf); - - let id1 = buf[0]; - let id2 = buf[1]; - if id1 != 0x1f || id2 != 0x8b { - return Err(bad_header()); - } - let cm = buf[2]; - if cm != 8 { - return Err(bad_header()); - } - - let flg = buf[3]; - let mtime = ((buf[4] as u32) << 0) - | ((buf[5] as u32) << 8) - | ((buf[6] as u32) << 16) - | ((buf[7] as u32) << 24); - let _xfl = buf[8]; - let os = buf[9]; - - header.operating_system = os; - header.mtime = mtime; - *flag = flg; - - next = Next::ExtraLen; - }, - GzHeaderState::ExtraLen(..) if *flag & FEXTRA == 0 => next = Next::FileName, - GzHeaderState::ExtraLen(pos, buf) => if *pos < buf.len() { - let len = r.read(&mut buf[*pos..]) - .and_then(|len| if len != 0 { - Ok(len) - } else { - Err(io::ErrorKind::UnexpectedEof.into()) - })?; - *pos += len; - } else { - hasher.update(buf); - - let xlen = (buf[0] as u16) | ((buf[1] as u16) << 8); - header.extra = Some(vec![0; xlen as usize]); - if xlen != 0 { - next = Next::Extra; - } else { - next = Next::FileName; - } - }, - GzHeaderState::Extra(pos) => if let Some(extra) = &mut header.extra { - if *pos < extra.len() { - let len = r.read(&mut extra[*pos..]) - .and_then(|len| if len != 0 { - Ok(len) - } else { - Err(io::ErrorKind::UnexpectedEof.into()) - })?; - *pos += len; - } else { - hasher.update(extra); - next = Next::FileName; - } - }, - GzHeaderState::FileName if *flag & FNAME == 0 => next = Next::Comment, - GzHeaderState::FileName => { - let filename = header.filename.get_or_insert_with(Vec::new); - - // wow this is slow - for byte in r.by_ref().bytes() { - let byte = byte?; - hasher.update(&[byte]); - if byte == 0 { - break; - } - filename.push(byte); - } - - next = Next::Comment; - }, - GzHeaderState::Comment if *flag & FCOMMENT == 0 => next = Next::Crc, - GzHeaderState::Comment => { - let comment = header.comment.get_or_insert_with(Vec::new); - - // wow this is slow - for byte in r.by_ref().bytes() { - let byte = byte?; - hasher.update(&[byte]); - if byte == 0 { - break; - } - comment.push(byte); - } - - next = Next::Crc - }, - GzHeaderState::Crc(..) if *flag & FHCRC == 0 => return Ok(()), - GzHeaderState::Crc(calced_crc, pos, buf) => if *pos < buf.len() { - let len = r.read(&mut buf[*pos..]) - .and_then(|len| if len != 0 { - Ok(len) - } else { - Err(io::ErrorKind::UnexpectedEof.into()) - })?; - *pos += len; - } else { - let stored_crc = (buf[0] as u16) | ((buf[1] as u16) << 8); - if *calced_crc != stored_crc { - return Err(corrupt()); - } else { - return Ok(()) - } +pub(crate) fn read_gz_header(r: &mut R) -> io::Result { + let mut crc_reader = CrcReader::new(r); + let mut header = [0; 10]; + crc_reader.read_exact(&mut header)?; + + let id1 = header[0]; + let id2 = header[1]; + if id1 != 0x1f || id2 != 0x8b { + return Err(bad_header()); + } + let cm = header[2]; + if cm != 8 { + return Err(bad_header()); + } + + let flg = header[3]; + let mtime = ((header[4] as u32) << 0) + | ((header[5] as u32) << 8) + | ((header[6] as u32) << 16) + | ((header[7] as u32) << 24); + let _xfl = header[8]; + let os = header[9]; + + let extra = if flg & FEXTRA != 0 { + let xlen = read_le_u16(&mut crc_reader)?; + let mut extra = vec![0; xlen as usize]; + crc_reader.read_exact(&mut extra)?; + Some(extra) + } else { + None + }; + let filename = if flg & FNAME != 0 { + // wow this is slow + let mut b = Vec::new(); + for byte in crc_reader.by_ref().bytes() { + let byte = byte?; + if byte == 0 { + break; } - }; - - match mem::replace(&mut next, Next::None) { - Next::ExtraLen => *state = GzHeaderState::ExtraLen(0, [0; 2]), - Next::Extra => *state = GzHeaderState::Extra(0), - Next::FileName => *state = GzHeaderState::FileName, - Next::Comment => *state = GzHeaderState::Comment, - Next::Crc => *state = GzHeaderState::Crc(hasher.clone().finalize() as u16, 0, [0; 2]), - Next::None => () + b.push(byte); + } + Some(b) + } else { + None + }; + let comment = if flg & FCOMMENT != 0 { + // wow this is slow + let mut b = Vec::new(); + for byte in crc_reader.by_ref().bytes() { + let byte = byte?; + if byte == 0 { + break; + } + b.push(byte); + } + Some(b) + } else { + None + }; + + if flg & FHCRC != 0 { + let calced_crc = crc_reader.crc().sum() as u16; + let stored_crc = read_le_u16(&mut crc_reader)?; + if calced_crc != stored_crc { + return Err(corrupt()); } } + + Ok(GzHeader { + extra: extra, + filename: filename, + comment: comment, + operating_system: os, + mtime: mtime, + }) } /// A gzip streaming encoder @@ -387,57 +299,77 @@ impl Write for GzEncoder { #[derive(Debug)] pub struct GzDecoder { inner: GzState, - header: GzHeader, + header: Option, reader: CrcReader>, multi: bool } #[derive(Debug)] enum GzState { - Header { - state: GzHeaderState, - flag: u8, - hasher: Hasher - }, + Header(Vec), Body, Finished(usize, [u8; 8]), Err(io::Error), End } +struct Buffer<'a, T> { + buf: io::Take>>, + reader: &'a mut T +} + +impl<'a, T> Buffer<'a, T> { + fn new(buf: &'a mut Vec, reader: &'a mut T) -> Buffer<'a, T> { + let len = buf.len(); + Buffer { buf: io::Cursor::new(buf).take(len as _), reader } + } +} + +impl<'a, T: Read> Read for Buffer<'a, T> { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let mut len = self.buf.read(buf)?; + if buf.len() > len { + match self.reader.read(&mut buf[len..])? { + // eof + 0 => return Err(bad_header()), + len2 => { + self.buf.get_mut().get_mut().extend_from_slice(&buf[len..][..len2]); + len += len2; + } + } + } + Ok(len) + } +} + impl GzDecoder { /// Creates a new decoder from the given reader, immediately parsing the /// gzip header. pub fn new(mut r: R) -> GzDecoder { - let mut state = GzHeaderState::Header(0, [0; 10]); - let mut header = GzHeader::default(); - let mut flag = 0; - let mut hasher = Hasher::new(); - let result = read_gz_header2(&mut r, &mut state, &mut header, &mut flag, &mut hasher); + let mut buf = Vec::new(); + let mut header = None; - GzDecoder { - inner: if let Err(err) = result { - GzState::Err(err) - } else { + let result = { + let mut reader = Buffer::new(&mut buf, &mut r); + read_gz_header(&mut reader) + }; + + let state = match result { + Ok(hdr) => { + header = Some(hdr); GzState::Body }, - reader: CrcReader::new(deflate::bufread::DeflateDecoder::new(r)), - multi: false, - header - } - } + Err(ref err) if io::ErrorKind::WouldBlock == err.kind() + || io::ErrorKind::UnexpectedEof == err.kind() + => GzState::Header(buf), + Err(err) => GzState::Err(err) + }; - /// Creates a new decoder from the given reader. - pub fn new2(r: R) -> GzDecoder { GzDecoder { - inner: GzState::Header { - state: GzHeaderState::Header(0, [0; 10]), - flag: 0, - hasher: Hasher::new() - }, - header: GzHeader::default(), + inner: state, reader: CrcReader::new(deflate::bufread::DeflateDecoder::new(r)), - multi: false + multi: false, + header } } @@ -450,10 +382,7 @@ impl GzDecoder { impl GzDecoder { /// Returns the header associated with this stream, if it was valid pub fn header(&self) -> Option<&GzHeader> { - match self.inner { - GzState::Err(_) | GzState::Header { .. } => None, - _ => Some(&self.header) - } + self.header.as_ref() } /// Acquires a reference to the underlying reader. @@ -492,14 +421,17 @@ impl Read for GzDecoder { loop { match inner { - GzState::Header { state, flag, hasher } => { - match read_gz_header2(reader.get_mut().get_mut(), state, header, flag, hasher) { - Ok(_) => next = Next::Body, - Err(err) => if io::ErrorKind::WouldBlock == err.kind() { - return Err(err); - } else { - next = Next::Err(err); - } + GzState::Header(buf) => { + let mut reader = Buffer::new(buf, reader.get_mut().get_mut()); + match read_gz_header(&mut reader) { + Ok(hdr) => { + *header = Some(hdr); + next = Next::Body; + }, + Err(ref err) if io::ErrorKind::WouldBlock == err.kind() + || io::ErrorKind::UnexpectedEof == err.kind() + => return Err(io::ErrorKind::WouldBlock.into()), + Err(err) => next = Next::Err(err) } }, GzState::Body => { @@ -555,12 +487,8 @@ impl Read for GzDecoder { Next::Header => { reader.reset(); reader.get_mut().reset_data(); - *header = GzHeader::default(); - *inner = GzState::Header { - state: GzHeaderState::Header(0, [0; 10]), - flag: 0, - hasher: Hasher::new() - }; + header.take(); + *inner = GzState::Header(Vec::new()); }, Next::Body => *inner = GzState::Body, Next::Finished => *inner = GzState::Finished(0, [0; 8]), @@ -644,12 +572,6 @@ impl MultiGzDecoder { pub fn new(r: R) -> MultiGzDecoder { MultiGzDecoder(GzDecoder::new(r).multi(true)) } - - /// Creates a new decoder from the given reader. - /// If the gzip stream contains multiple members all will be decoded. - pub fn new2(r: R) -> MultiGzDecoder { - MultiGzDecoder(GzDecoder::new2(r).multi(true)) - } } impl MultiGzDecoder { diff --git a/src/gz/read.rs b/src/gz/read.rs index 7481dd19b..774f4f381 100644 --- a/src/gz/read.rs +++ b/src/gz/read.rs @@ -142,13 +142,6 @@ impl GzDecoder { inner: bufread::GzDecoder::new(BufReader::new(r)), } } - - /// Creates a new decoder from the given reader. - pub fn new2(r: R) -> GzDecoder { - GzDecoder { - inner: bufread::GzDecoder::new2(BufReader::new(r)), - } - } } impl GzDecoder { @@ -256,14 +249,6 @@ impl MultiGzDecoder { inner: bufread::MultiGzDecoder::new(BufReader::new(r)), } } - - /// Creates a new decoder from the given reader. - /// If the gzip stream contains multiple members all will be decoded. - pub fn new2(r: R) -> MultiGzDecoder { - MultiGzDecoder { - inner: bufread::MultiGzDecoder::new2(BufReader::new(r)), - } - } } impl MultiGzDecoder { diff --git a/tests/async-reader.rs b/tests/async-reader.rs index 0f90fd6a9..b95e4a19b 100644 --- a/tests/async-reader.rs +++ b/tests/async-reader.rs @@ -65,7 +65,7 @@ impl Future for AlwaysNotify { fn test_gz_asyncread() { let f = File::open("tests/good-file.gz").unwrap(); - let fut = read_to_end(AssertAsync(GzDecoder::new2(BadReader::new(f))), Vec::new()); + let fut = read_to_end(AssertAsync(GzDecoder::new(BadReader::new(f))), Vec::new()); let (_, content) = AlwaysNotify(fut).wait().unwrap(); let mut expected = Vec::new(); @@ -81,7 +81,7 @@ fn test_gz_asyncread() { fn test_multi_gz_asyncread() { let f = File::open("tests/multi.gz").unwrap(); - let fut = read_to_end(AssertAsync(MultiGzDecoder::new2(BadReader::new(f))), Vec::new()); + let fut = read_to_end(AssertAsync(MultiGzDecoder::new(BadReader::new(f))), Vec::new()); let (_, content) = AlwaysNotify(fut).wait().unwrap(); let mut expected = Vec::new(); From 38a6809ef81fae005c95ce30f36685ad4d9462f7 Mon Sep 17 00:00:00 2001 From: quininer Date: Mon, 25 Feb 2019 11:57:50 +0800 Subject: [PATCH 11/12] small improve --- src/gz/bufread.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/gz/bufread.rs b/src/gz/bufread.rs index f76d0d08d..226491ec4 100644 --- a/src/gz/bufread.rs +++ b/src/gz/bufread.rs @@ -346,7 +346,7 @@ impl GzDecoder { /// Creates a new decoder from the given reader, immediately parsing the /// gzip header. pub fn new(mut r: R) -> GzDecoder { - let mut buf = Vec::new(); + let mut buf = Vec::with_capacity(10); // minimum header length let mut header = None; let result = { From c5b2b88243ab9d4cd7b30c8c7ad3864b984343a9 Mon Sep 17 00:00:00 2001 From: quininer Date: Tue, 12 Mar 2019 13:15:23 +0800 Subject: [PATCH 12/12] remove Next enum --- src/gz/bufread.rs | 162 ++++++++++++++++++++++------------------------ 1 file changed, 78 insertions(+), 84 deletions(-) diff --git a/src/gz/bufread.rs b/src/gz/bufread.rs index 226491ec4..f06205f41 100644 --- a/src/gz/bufread.rs +++ b/src/gz/bufread.rs @@ -329,14 +329,9 @@ impl<'a, T: Read> Read for Buffer<'a, T> { fn read(&mut self, buf: &mut [u8]) -> io::Result { let mut len = self.buf.read(buf)?; if buf.len() > len { - match self.reader.read(&mut buf[len..])? { - // eof - 0 => return Err(bad_header()), - len2 => { - self.buf.get_mut().get_mut().extend_from_slice(&buf[len..][..len2]); - len += len2; - } - } + let len2 = self.reader.read(&mut buf[len..])?; + self.buf.get_mut().get_mut().extend_from_slice(&buf[len..][..len2]); + len += len2; } Ok(len) } @@ -360,7 +355,6 @@ impl GzDecoder { GzState::Body }, Err(ref err) if io::ErrorKind::WouldBlock == err.kind() - || io::ErrorKind::UnexpectedEof == err.kind() => GzState::Header(buf), Err(err) => GzState::Err(err) }; @@ -408,96 +402,96 @@ impl Read for GzDecoder { fn read(&mut self, into: &mut [u8]) -> io::Result { let GzDecoder { inner, header, reader, multi } = self; - enum Next { - None, - Header, - Body, - Finished, - Err(io::Error), - End - } - - let mut next = Next::None; - loop { - match inner { - GzState::Header(buf) => { - let mut reader = Buffer::new(buf, reader.get_mut().get_mut()); - match read_gz_header(&mut reader) { - Ok(hdr) => { - *header = Some(hdr); - next = Next::Body; - }, - Err(ref err) if io::ErrorKind::WouldBlock == err.kind() - || io::ErrorKind::UnexpectedEof == err.kind() - => return Err(io::ErrorKind::WouldBlock.into()), - Err(err) => next = Next::Err(err) - } + *inner = match mem::replace(inner, GzState::End) { + GzState::Header(mut buf) => { + let result = { + let mut reader = Buffer::new(&mut buf, reader.get_mut().get_mut()); + read_gz_header(&mut reader) + }; + let hdr = result + .map_err(|err| { + if io::ErrorKind::WouldBlock == err.kind() { + *inner = GzState::Header(buf); + } + + err + })?; + *header = Some(hdr); + GzState::Body }, GzState::Body => { if into.is_empty() { + *inner = GzState::Body; return Ok(0); } - match reader.read(into)? { - 0 => next = Next::Finished, - n => return Ok(n) + let n = reader.read(into) + .map_err(|err| { + if io::ErrorKind::WouldBlock == err.kind() { + *inner = GzState::Body; + } + + err + })?; + + match n { + 0 => GzState::Finished(0, [0; 8]), + n => { + *inner = GzState::Body; + return Ok(n); + } } }, - GzState::Finished(pos, buf) => if *pos < buf.len() { - match reader.get_mut().get_mut().read(&mut buf[*pos..]) { - Ok(0) => next = Next::Err(io::ErrorKind::UnexpectedEof.into()), - Ok(n) => *pos += n, - Err(err) => if io::ErrorKind::WouldBlock == err.kind() { - return Err(err); + GzState::Finished(pos, mut buf) => if pos < buf.len() { + let n = reader.get_mut().get_mut() + .read(&mut buf[pos..]) + .and_then(|n| if n == 0 { + Err(io::ErrorKind::UnexpectedEof.into()) } else { - next = Next::Err(err); - } - } - } else { - let (crc, amt) = finish(buf); - - if crc != reader.crc().sum() { - next = Next::Err(corrupt()); - } else if amt != reader.crc().amount() { - next = Next::Err(corrupt()); - } else if !*multi { - next = Next::End; - } else { - match reader.get_mut().get_mut().fill_buf() { - Ok(buf) => if buf.is_empty() { - next = Next::End; - } else { - next = Next::Header; - }, - Err(err) => if io::ErrorKind::WouldBlock == err.kind() { - return Err(err); - } else { - next = Next::Err(err); + Ok(n) + }) + .map_err(|err| { + if io::ErrorKind::WouldBlock == err.kind() { + *inner = GzState::Finished(pos, buf); } + + err + })?; + + GzState::Finished(pos + n, buf) + } else { + let (crc, amt) = finish(&buf); + + if crc != reader.crc().sum() || amt != reader.crc().amount() { + return Err(corrupt()); + } else if *multi { + let is_eof = reader.get_mut().get_mut() + .fill_buf() + .map(|buf| buf.is_empty()) + .map_err(|err| { + if io::ErrorKind::WouldBlock == err.kind() { + *inner = GzState::Finished(pos, buf); + } + + err + })?; + + if is_eof { + GzState::End + } else { + reader.reset(); + reader.get_mut().reset_data(); + header.take(); + GzState::Header(Vec::with_capacity(10)) } + } else { + GzState::End } }, - GzState::Err(err) => next = Next::Err(mem::replace(err, io::ErrorKind::Other.into())), + GzState::Err(err) => return Err(err), GzState::End => return Ok(0) - } - - match mem::replace(&mut next, Next::None) { - Next::None => (), - Next::Header => { - reader.reset(); - reader.get_mut().reset_data(); - header.take(); - *inner = GzState::Header(Vec::new()); - }, - Next::Body => *inner = GzState::Body, - Next::Finished => *inner = GzState::Finished(0, [0; 8]), - Next::Err(err) => { - *inner = GzState::End; - return Err(err); - }, - Next::End => *inner = GzState::End - } + }; } } }