diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs index f110e3d8272a..310dbd34f1f6 100644 --- a/parquet/src/compression.rs +++ b/parquet/src/compression.rs @@ -38,7 +38,7 @@ let mut compressed = vec![]; codec.compress(&data[..], &mut compressed).unwrap(); let mut output = vec![]; -codec.decompress(&compressed[..], &mut output).unwrap(); +codec.decompress(&compressed[..], &mut output, None).unwrap(); assert_eq!(output, data); ``` @@ -57,9 +57,18 @@ pub trait Codec: Send { fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()>; /// Decompresses data stored in slice `input_buf` and appends output to `output_buf`. + /// + /// If the uncompress_size is provided it will allocate the exact amount of memory. + /// Otherwise, it will estimate the uncompressed size, allocating an amount of memory + /// greater or equal to the real uncompress_size. + /// /// Returns the total number of bytes written. - fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec) - -> Result; + fn decompress( + &mut self, + input_buf: &[u8], + output_buf: &mut Vec, + uncompress_size: Option, + ) -> Result; } /// Given the compression type `codec`, returns a codec used to compress and decompress @@ -112,8 +121,12 @@ mod snappy_codec { &mut self, input_buf: &[u8], output_buf: &mut Vec, + uncompress_size: Option, ) -> Result { - let len = decompress_len(input_buf)?; + let len = match uncompress_size { + Some(size) => size, + None => decompress_len(input_buf)?, + }; let offset = output_buf.len(); output_buf.resize(offset + len, 0); self.decoder @@ -161,6 +174,7 @@ mod gzip_codec { &mut self, input_buf: &[u8], output_buf: &mut Vec, + _uncompress_size: Option, ) -> Result { let mut decoder = read::GzDecoder::new(input_buf); decoder.read_to_end(output_buf).map_err(|e| e.into()) @@ -203,8 +217,10 @@ mod brotli_codec { &mut self, input_buf: &[u8], output_buf: &mut Vec, + uncompress_size: Option, ) -> Result { - brotli::Decompressor::new(input_buf, BROTLI_DEFAULT_BUFFER_SIZE) + let buffer_size = uncompress_size.unwrap_or(BROTLI_DEFAULT_BUFFER_SIZE); + brotli::Decompressor::new(input_buf, buffer_size) .read_to_end(output_buf) .map_err(|e| e.into()) } @@ -248,6 +264,7 @@ mod lz4_codec { &mut self, input_buf: &[u8], output_buf: &mut Vec, + _uncompress_size: Option, ) -> Result { let mut decoder = lz4::Decoder::new(input_buf)?; let mut buffer: [u8; LZ4_BUFFER_SIZE] = [0; LZ4_BUFFER_SIZE]; @@ -306,6 +323,7 @@ mod zstd_codec { &mut self, input_buf: &[u8], output_buf: &mut Vec, + _uncompress_size: Option, ) -> Result { let mut decoder = zstd::Decoder::new(input_buf)?; match io::copy(&mut decoder, output_buf) { @@ -353,16 +371,23 @@ mod lz4_raw_codec { &mut self, input_buf: &[u8], output_buf: &mut Vec, + uncompress_size: Option, ) -> Result { let offset = output_buf.len(); - let required_len = max_uncompressed_size(input_buf.len()); + let required_len = + uncompress_size.unwrap_or_else(|| max_uncompressed_size(input_buf.len())); output_buf.resize(offset + required_len, 0); - let required_len: i32 = required_len.try_into().unwrap(); - match lz4::block::decompress_to_buffer(input_buf, Some(required_len), &mut output_buf[offset..]) { + match lz4::block::decompress_to_buffer( + input_buf, + Some(required_len.try_into().unwrap()), + &mut output_buf[offset..], + ) { Ok(n) => { - output_buf.truncate(offset + n); - Ok(n) - }, + if n < required_len { + output_buf.truncate(offset + n); + } + Ok(n) + } Err(e) => Err(e.into()), } } @@ -371,11 +396,16 @@ mod lz4_raw_codec { let offset = output_buf.len(); let required_len = lz4::block::compress_bound(input_buf.len())?; output_buf.resize(offset + required_len, 0); - match lz4::block::compress_to_buffer(input_buf, None, false, &mut output_buf[offset..]) { + match lz4::block::compress_to_buffer( + input_buf, + None, + false, + &mut output_buf[offset..], + ) { Ok(n) => { output_buf.truncate(offset + n); Ok(()) - }, + } Err(e) => Err(e.into()), } } @@ -390,7 +420,7 @@ mod tests { use crate::util::test_common::rand_gen::random_bytes; - fn test_roundtrip(c: CodecType, data: &[u8]) { + fn test_roundtrip(c: CodecType, data: &[u8], uncompress_size: Option) { let mut c1 = create_codec(c).unwrap().unwrap(); let mut c2 = create_codec(c).unwrap().unwrap(); @@ -402,7 +432,7 @@ mod tests { // Decompress with c2 let decompressed_size = c2 - .decompress(compressed.as_slice(), &mut decompressed) + .decompress(compressed.as_slice(), &mut decompressed, uncompress_size) .expect("Error when decompressing"); assert_eq!(data.len(), decompressed_size); assert_eq!(data, decompressed.as_slice()); @@ -416,7 +446,7 @@ mod tests { // Decompress with c1 let decompressed_size = c1 - .decompress(compressed.as_slice(), &mut decompressed) + .decompress(compressed.as_slice(), &mut decompressed, uncompress_size) .expect("Error when decompressing"); assert_eq!(data.len(), decompressed_size); assert_eq!(data, decompressed.as_slice()); @@ -435,7 +465,7 @@ mod tests { assert_eq!(&compressed[..4], prefix); let decompressed_size = c2 - .decompress(&compressed[4..], &mut decompressed) + .decompress(&compressed[4..], &mut decompressed, uncompress_size) .expect("Error when decompressing"); assert_eq!(data.len(), decompressed_size); @@ -447,7 +477,8 @@ mod tests { let sizes = vec![100, 10000, 100000]; for size in sizes { let data = random_bytes(size); - test_roundtrip(c, &data); + test_roundtrip(c, &data, None); + test_roundtrip(c, &data, Some(data.len())); } } diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 6b416e34dc65..854ae1ef6d34 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -420,7 +420,11 @@ pub(crate) fn decode_page( let mut decompressed = Vec::with_capacity(uncompressed_size); let compressed = &buffer.as_ref()[offset..]; decompressed.extend_from_slice(&buffer.as_ref()[..offset]); - decompressor.decompress(compressed, &mut decompressed)?; + decompressor.decompress( + compressed, + &mut decompressed, + Some(uncompressed_size - offset), + )?; if decompressed.len() != uncompressed_size { return Err(general_err!(