diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d50281b61..c1bdaeeaa 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -24,6 +24,8 @@ jobs: deactivate - name: Run run: cargo test + - name: Run lz4-flex + run: cargo test --no-default-features --features lz4_flex,bloom_filter,stream,snappy,brotli,zstd,gzip clippy: name: Clippy diff --git a/Cargo.toml b/Cargo.toml index 657355c5b..3ea2a91ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ brotli = { version = "^3.3", optional = true } flate2 = { version = "^1.0", optional = true } lz4 = { version = "1.23.3", optional = true } zstd = { version = "^0.11", optional = true, default-features = false } +lz4_flex = { version = "^0.9.2", optional = true } xxhash-rust = { version="0.8.3", optional = true, features = ["xxh64"] } diff --git a/src/compression.rs b/src/compression.rs index a302ef0d4..2760fcc5d 100644 --- a/src/compression.rs +++ b/src/compression.rs @@ -62,6 +62,17 @@ pub fn compress( crate::error::Feature::Snappy, "compress to snappy".to_string(), )), + #[cfg(all(feature = "lz4_flex", not(feature = "lz4")))] + Compression::Lz4Raw => { + let output_buf_len = output_buf.len(); + let required_len = lz4_flex::block::get_maximum_output_size(input_buf.len()); + output_buf.resize(output_buf_len + required_len, 0); + + let compressed_size = + lz4_flex::block::compress_into(input_buf, &mut output_buf[output_buf_len..])?; + output_buf.truncate(output_buf_len + compressed_size); + Ok(()) + } #[cfg(feature = "lz4")] Compression::Lz4Raw => { let output_buf_len = output_buf.len(); @@ -76,7 +87,7 @@ pub fn compress( output_buf.truncate(output_buf_len + size); Ok(()) } - #[cfg(not(feature = "lz4"))] + #[cfg(all(not(feature = "lz4"), not(feature = "lz4_flex")))] Compression::Lz4Raw => Err(Error::FeatureNotActive( crate::error::Feature::Lz4, "compress to lz4".to_string(), @@ -153,13 +164,17 @@ pub fn decompress(compression: Compression, input_buf: &[u8], output_buf: &mut [ crate::error::Feature::Snappy, "decompress with snappy".to_string(), )), + #[cfg(all(feature = "lz4_flex", not(feature = "lz4")))] + Compression::Lz4Raw => lz4_flex::block::decompress_into(input_buf, output_buf) + .map(|_| {}) + .map_err(|e| e.into()), #[cfg(feature = "lz4")] Compression::Lz4Raw => { lz4::block::decompress_to_buffer(input_buf, Some(output_buf.len() as i32), output_buf) .map(|_| {}) .map_err(|e| e.into()) } - #[cfg(not(feature = "lz4"))] + #[cfg(all(not(feature = "lz4"), not(feature = "lz4_flex")))] Compression::Lz4Raw => Err(Error::FeatureNotActive( crate::error::Feature::Lz4, "decompress with lz4".to_string(), diff --git a/src/error.rs b/src/error.rs index c8486d687..37a13dcda 100644 --- a/src/error.rs +++ b/src/error.rs @@ -64,6 +64,20 @@ impl From for Error { } } +#[cfg(feature = "lz4_flex")] +impl From for Error { + fn from(e: lz4_flex::block::DecompressError) -> Error { + Error::General(format!("underlying lz4_flex error: {}", e)) + } +} + +#[cfg(feature = "lz4_flex")] +impl From for Error { + fn from(e: lz4_flex::block::CompressError) -> Error { + Error::General(format!("underlying lz4_flex error: {}", e)) + } +} + impl From for Error { fn from(e: parquet_format_async_temp::thrift::Error) -> Error { Error::General(format!("underlying thrift error: {}", e))