diff --git a/src/gz/bufread.rs b/src/gz/bufread.rs index b66ab72f8..3b1484446 100644 --- a/src/gz/bufread.rs +++ b/src/gz/bufread.rs @@ -10,7 +10,7 @@ use tokio_io::{AsyncRead, AsyncWrite}; use super::{GzBuilder, GzHeader}; use super::{FCOMMENT, FEXTRA, FHCRC, FNAME}; -use crate::crc::CrcReader; +use crate::crc::{Crc, CrcReader}; use crate::deflate; use crate::Compression; @@ -34,87 +34,107 @@ fn bad_header() -> io::Error { io::Error::new(io::ErrorKind::InvalidInput, "invalid gzip header") } -fn read_le_u16(r: &mut R) -> io::Result { +fn read_le_u16(r: &mut Buffer) -> io::Result { let mut b = [0; 2]; - r.read_exact(&mut b)?; + r.read_and_forget(&mut b)?; Ok((b[0] as u16) | ((b[1] as u16) << 8)) } -pub(crate) fn read_gz_header(r: &mut R) -> io::Result { - let mut crc_reader = CrcReader::new(r); - let mut header = [0; 10]; - crc_reader.read_exact(&mut header)?; - - let id1 = header[0]; - let id2 = header[1]; - if id1 != 0x1f || id2 != 0x8b { - return Err(bad_header()); - } - let cm = header[2]; - if cm != 8 { - return Err(bad_header()); - } - - let flg = header[3]; - let mtime = ((header[4] as u32) << 0) - | ((header[5] as u32) << 8) - | ((header[6] as u32) << 16) - | ((header[7] as u32) << 24); - let _xfl = header[8]; - let os = header[9]; - - let extra = if flg & FEXTRA != 0 { - let xlen = read_le_u16(&mut crc_reader)?; - let mut extra = vec![0; xlen as usize]; - crc_reader.read_exact(&mut extra)?; - Some(extra) - } else { - None - }; - let filename = if flg & FNAME != 0 { - // wow this is slow - let mut b = Vec::new(); - for byte in crc_reader.by_ref().bytes() { - let byte = byte?; - if byte == 0 { - break; +fn read_gz_header_part<'a, R: Read>(r: &'a mut Buffer<'a, R>) -> io::Result<()> { + loop { + match r.part.state { + GzHeaderParsingState::Start => { + let mut header = [0; 10]; + r.read_and_forget(&mut header)?; + + if header[0] != 0x1f || header[1] != 0x8b { + return Err(bad_header()); + } + if header[2] != 8 { + return Err(bad_header()); + } + + r.part.flg = header[3]; + r.part.header.mtime = ((header[4] as u32) << 0) + | ((header[5] as u32) << 8) + | ((header[6] as u32) << 16) + | ((header[7] as u32) << 24); + let _xfl = header[8]; + r.part.header.operating_system = header[9]; + r.part.state = GzHeaderParsingState::Xlen; } - b.push(byte); - } - Some(b) - } else { - None - }; - let comment = if flg & FCOMMENT != 0 { - // wow this is slow - let mut b = Vec::new(); - for byte in crc_reader.by_ref().bytes() { - let byte = byte?; - if byte == 0 { - break; + GzHeaderParsingState::Xlen => { + if r.part.flg & FEXTRA != 0 { + r.part.xlen = read_le_u16(r)?; + } + r.part.state = GzHeaderParsingState::Extra; + } + GzHeaderParsingState::Extra => { + if r.part.flg & FEXTRA != 0 { + let mut extra = vec![0; r.part.xlen as usize]; + r.read_and_forget(&mut extra)?; + r.part.header.extra = Some(extra); + } + r.part.state = GzHeaderParsingState::Filename; + } + GzHeaderParsingState::Filename => { + if r.part.flg & FNAME != 0 { + if None == r.part.header.filename { + r.part.header.filename = Some(Vec::new()); + }; + for byte in r.bytes() { + let byte = byte?; + if byte == 0 { + break; + } + } + } + r.part.state = GzHeaderParsingState::Comment; + } + GzHeaderParsingState::Comment => { + if r.part.flg & FCOMMENT != 0 { + if None == r.part.header.comment { + r.part.header.comment = Some(Vec::new()); + }; + for byte in r.bytes() { + let byte = byte?; + if byte == 0 { + break; + } + } + } + r.part.state = GzHeaderParsingState::Crc; + } + GzHeaderParsingState::Crc => { + if r.part.flg & FHCRC != 0 { + let stored_crc = read_le_u16(r)?; + let calced_crc = r.part.crc.sum() as u16; + if stored_crc != calced_crc { + return Err(corrupt()); + } + } + return Ok(()); } - b.push(byte); } - Some(b) - } else { - None + } +} + +pub(crate) fn read_gz_header(r: &mut R) -> io::Result { + let mut part = GzHeaderPartial::new(); + + let result = { + let mut reader = Buffer::new(&mut part, r); + read_gz_header_part(&mut reader) }; - if flg & FHCRC != 0 { - let calced_crc = crc_reader.crc().sum() as u16; - let stored_crc = read_le_u16(&mut crc_reader)?; - if calced_crc != stored_crc { - return Err(corrupt()); + match result { + Ok(()) => { + return Ok(part.take_header()); } - } - - Ok(GzHeader { - extra: extra, - filename: filename, - comment: comment, - operating_system: os, - mtime: mtime, - }) + Err(err) => { + return Err(err); + } + }; } /// A gzip streaming encoder @@ -304,9 +324,52 @@ pub struct GzDecoder { multi: bool, } +#[derive(Debug)] +pub enum GzHeaderParsingState { + Start, + Xlen, + Extra, + Filename, + Comment, + Crc, +} + +#[derive(Debug)] +pub struct GzHeaderPartial { + buf: Vec, + state: GzHeaderParsingState, + flg: u8, + xlen: u16, + crc: Crc, + header: GzHeader, +} + +impl GzHeaderPartial { + fn new() -> GzHeaderPartial { + GzHeaderPartial { + buf: Vec::with_capacity(10), // minimum header length + state: GzHeaderParsingState::Start, + flg: 0, + xlen: 0, + crc: Crc::new(), + header: GzHeader { + extra: None, + filename: None, + comment: None, + operating_system: 0, + mtime: 0, + }, + } + } + + pub fn take_header(self) -> GzHeader { + return self.header; + } +} + #[derive(Debug)] enum GzState { - Header(Vec), + Header(GzHeaderPartial), Body, Finished(usize, [u8; 8]), Err(io::Error), @@ -317,55 +380,91 @@ enum GzState { /// further data from `reader`. This will also buffer all data read from /// `reader` into `buf` for reuse on a further call. struct Buffer<'a, T: 'a> { - buf: &'a mut Vec, + part: &'a mut GzHeaderPartial, buf_cur: usize, buf_max: usize, reader: &'a mut T, } impl<'a, T> Buffer<'a, T> { - fn new(buf: &'a mut Vec, reader: &'a mut T) -> Buffer<'a, T> { + fn new(part: &'a mut GzHeaderPartial, reader: &'a mut T) -> Buffer<'a, T> { Buffer { reader, buf_cur: 0, - buf_max: buf.len(), - buf, + buf_max: part.buf.len(), + part, } } } impl<'a, T: Read> Read for Buffer<'a, T> { fn read(&mut self, buf: &mut [u8]) -> io::Result { - if self.buf_cur == self.buf_max { + let mut bufref = match self.part.state { + GzHeaderParsingState::Filename => self.part.header.filename.as_mut(), + GzHeaderParsingState::Comment => self.part.header.comment.as_mut(), + _ => None, + }; + if let Some(ref mut b) = bufref { + // we have a direct reference to a buffer where to write let len = self.reader.read(buf)?; - self.buf.extend_from_slice(&buf[..len]); + if len > 0 && buf[len - 1] == 0 { + // we do not append the final 0 + b.extend_from_slice(&buf[..len - 1]); + } else { + b.extend_from_slice(&buf[..len]); + } + self.part.crc.update(&buf[..len]); + Ok(len) + } else if self.buf_cur == self.buf_max { + // we read new bytes and also save them in self.part.buf + let len = self.reader.read(buf)?; + self.part.buf.extend_from_slice(&buf[..len]); + self.part.crc.update(&buf[..len]); Ok(len) } else { - let len = (&self.buf[self.buf_cur..self.buf_max]).read(buf)?; + // we first read the previously saved bytes + let len = (&self.part.buf[self.buf_cur..self.buf_max]).read(buf)?; self.buf_cur += len; Ok(len) } } } +impl<'a, T> Buffer<'a, T> +where + T: std::io::Read, +{ + // If we manage to read all the bytes, we reset the buffer + fn read_and_forget(&mut self, buf: &mut [u8]) -> io::Result { + self.read_exact(buf)?; + // we managed to read the whole buf + // we will no longer need the previously saved bytes in self.part.buf + let rlen = buf.len(); + self.part.buf.truncate(0); + self.buf_cur = 0; + self.buf_max = 0; + return Ok(rlen); + } +} + impl GzDecoder { /// Creates a new decoder from the given reader, immediately parsing the /// gzip header. pub fn new(mut r: R) -> GzDecoder { - let mut buf = Vec::with_capacity(10); // minimum header length + let mut part = GzHeaderPartial::new(); let mut header = None; let result = { - let mut reader = Buffer::new(&mut buf, &mut r); - read_gz_header(&mut reader) + let mut reader = Buffer::new(&mut part, &mut r); + read_gz_header_part(&mut reader) }; let state = match result { - Ok(hdr) => { - header = Some(hdr); + Ok(()) => { + header = Some(part.take_header()); GzState::Body } - Err(ref err) if io::ErrorKind::WouldBlock == err.kind() => GzState::Header(buf), + Err(ref err) if io::ErrorKind::WouldBlock == err.kind() => GzState::Header(part), Err(err) => GzState::Err(err), }; @@ -419,20 +518,22 @@ impl Read for GzDecoder { loop { *inner = match mem::replace(inner, GzState::End) { - GzState::Header(mut buf) => { + GzState::Header(mut part) => { let result = { - let mut reader = Buffer::new(&mut buf, reader.get_mut().get_mut()); - read_gz_header(&mut reader) + let mut reader = Buffer::new(&mut part, reader.get_mut().get_mut()); + read_gz_header_part(&mut reader) }; - let hdr = result.map_err(|err| { - if io::ErrorKind::WouldBlock == err.kind() { - *inner = GzState::Header(buf); + let state = match result { + Ok(()) => { + *header = Some(part.take_header()); + GzState::Body } - - err - })?; - *header = Some(hdr); - GzState::Body + Err(ref err) if io::ErrorKind::WouldBlock == err.kind() => { + GzState::Header(part) + } + Err(err) => GzState::Err(err), + }; + state } GzState::Body => { if into.is_empty() { @@ -503,7 +604,7 @@ impl Read for GzDecoder { reader.reset(); reader.get_mut().reset_data(); header.take(); - GzState::Header(Vec::with_capacity(10)) + GzState::Header(GzHeaderPartial::new()) } } else { GzState::End @@ -639,3 +740,156 @@ impl AsyncWrite for MultiGzDecoder { self.get_mut().shutdown() } } + +#[cfg(test)] +mod tests { + use crate::gz::bufread::*; + use std::io; + use std::io::{Cursor, Read, Write}; + + //a cursor turning EOF into blocking errors + #[derive(Debug)] + pub struct BlockingCursor { + pub cursor: Cursor>, + } + + impl BlockingCursor { + pub fn new() -> BlockingCursor { + BlockingCursor { + cursor: Cursor::new(Vec::new()), + } + } + + pub fn set_position(&mut self, pos: u64) { + return self.cursor.set_position(pos); + } + + pub fn position(&mut self) -> u64 { + return self.cursor.position(); + } + } + + impl Write for BlockingCursor { + fn write(&mut self, buf: &[u8]) -> io::Result { + return self.cursor.write(buf); + } + fn flush(&mut self) -> io::Result<()> { + return self.cursor.flush(); + } + } + + impl Read for BlockingCursor { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + //use the cursor, except it turns eof into blocking error + let r = self.cursor.read(buf); + match r { + Err(ref err) => { + if err.kind() == io::ErrorKind::UnexpectedEof { + return Err(io::ErrorKind::WouldBlock.into()); + } + } + Ok(0) => { + //regular EOF turned into blocking error + return Err(io::ErrorKind::WouldBlock.into()); + } + Ok(_n) => {} + } + return r; + } + } + #[test] + // test function read_and_forget of Buffer + fn buffer_read_and_forget() { + // this is unused except for the buffering + let mut part = GzHeaderPartial::new(); + // this is a reader which receives data afterwards + let mut r = BlockingCursor::new(); + let data = vec![1, 2, 3]; + let mut out = Vec::with_capacity(7); + + match r.write_all(&data) { + Ok(()) => {} + _ => { + panic!("Unexpected result for write_all"); + } + } + r.set_position(0); + + // First read : successful for one byte + let mut reader = Buffer::new(&mut part, &mut r); + out.resize(1, 0); + match reader.read_and_forget(&mut out) { + Ok(1) => {} + _ => { + panic!("Unexpected result for read_and_forget with data"); + } + } + + // Second read : incomplete for 7 bytes (we have only 2) + out.resize(7, 0); + match reader.read_and_forget(&mut out) { + Err(ref err) => { + assert_eq!(io::ErrorKind::WouldBlock, err.kind()); + } + _ => { + panic!("Unexpected result for read_and_forget with incomplete"); + } + } + + // 3 more data bytes have arrived + let pos = r.position(); + let data2 = vec![4, 5, 6]; + match r.write_all(&data2) { + Ok(()) => {} + _ => { + panic!("Unexpected result for write_all"); + } + } + r.set_position(pos); + + // Third read : still incomplete for 7 bytes (we have 5) + let mut reader2 = Buffer::new(&mut part, &mut r); + match reader2.read_and_forget(&mut out) { + Err(ref err) => { + assert_eq!(io::ErrorKind::WouldBlock, err.kind()); + } + _ => { + panic!("Unexpected result for read_and_forget with more incomplete"); + } + } + + // 3 more data bytes have arrived again + let pos2 = r.position(); + let data3 = vec![7, 8, 9]; + match r.write_all(&data3) { + Ok(()) => {} + _ => { + panic!("Unexpected result for write_all"); + } + } + r.set_position(pos2); + + // Fourth read : now succesful for 7 bytes + let mut reader3 = Buffer::new(&mut part, &mut r); + match reader3.read_and_forget(&mut out) { + Ok(7) => { + assert_eq!(out[0], 2); + assert_eq!(out[6], 8); + } + _ => { + panic!("Unexpected result for read_and_forget with data"); + } + } + + // Fifth read : succesful for one more byte + out.resize(1, 0); + match reader3.read_and_forget(&mut out) { + Ok(1) => { + assert_eq!(out[0], 9); + } + _ => { + panic!("Unexpected result for read_and_forget with data"); + } + } + } +}