-
Notifications
You must be signed in to change notification settings - Fork 849
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support compression for IPC #1855
Conversation
e79ecf8
to
7dbd4d6
Compare
7dbd4d6
to
ede5115
Compare
arrow/src/ipc/writer.rs
Outdated
}; | ||
let len = data.len() as i64; | ||
// TODO: don't need to pad each buffer, and just need to pad the tail of the message body | ||
// let pad_len = pad_to_8(len as u32); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this make sense to you, I will drop these commented code.
Codecov Report
@@ Coverage Diff @@
## master #1855 +/- ##
==========================================
+ Coverage 83.42% 83.47% +0.04%
==========================================
Files 214 215 +1
Lines 57015 57286 +271
==========================================
+ Hits 47567 47819 +252
- Misses 9448 9467 +19
Continue to review full report at Codecov.
|
hi @andygrove @alamb , I have implemented the IPC compression, but the ci on AMD 32 fails. |
After this pr merged, I will enable the IT test for 2.0.0-compression |
The integration test was failed from https://github.com/apache/arrow-rs/runs/6952125461?check_suite_focus=true#step:6:15070 I'm confused about why we need to make 8 bytes aligned for each buffer |
@martin-g Could you please take a look this ci problem? |
I think the issue has been resolved |
Thanks @liukun4515 -- I will try and find time tomorrow to review this PR |
arrow/Cargo.toml
Outdated
@@ -60,7 +63,7 @@ multiversion = { version = "0.6.1", default-features = false } | |||
bitflags = { version = "1.2.1", default-features = false } | |||
|
|||
[features] | |||
default = ["csv", "ipc", "test_utils"] | |||
default = ["csv", "ipc", "test_utils", "zstd", "lz4"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So by default we will include zstd, lz4 dependencies? If we don't use ipc or don't use compressed ipc, seems the dependencies are not necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree -- I suggest something like
default = ["csv", "ipc", "test_utils", "zstd", "lz4"] | |
default = ["csv", "ipc", "test_utils"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, include the dependency by default.
buffer compression protocol is common protocol for all languages in the arrow ecosystem, if we use the compression/decompression as optional, we can read the file or stream from compressed side.
2.0.0 compression has been implemented in the Java, C++ and other languages, If your rust server receiver a message compressed by the protocol, we can read them by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be in favour of supporting adding the deps toipc
but not creating ipc_compression
separately. The inconvenience of a larger binary is a lesser problem than the runtime error of not supporting compression by omission.
The latter would need a rebuild.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have encountered an issue on cross-compiling lz4 crate on some platform. We don't use ipc so we can choose not to include ipc feature at all. I'm wondering if ipc compression can't be excluded, there might be some issues like that. Although this sounds like corner case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have add the feature ipc_compression = "lz4,zstd"
as an option, if you want to compile with the ipc compression feature.
arrow/src/ipc/reader.rs
Outdated
// TODO consider the error result | ||
#[cfg(any(feature = "zstd,lz4", test))] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, if these features are not used, what will it happened? I think an explicit error is necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree with you, @viirya . I have no idea and how to control this.
Could you please give me some suggestion or advice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I follow the #[cfg(any(feature
usage like other model to resolve the error from the CI
cargo build --features=csv,ipc,simd,lz4,zstd --target wasm32-unknown-unknown
If I don't use this method, I can't pass the ci for target wasm32-unknown-unknown
I search the reason and find the issue #180 and #180 which have the same problem.
@alamb
}) | ||
} | ||
} | ||
z => panic!("Unsupported ipc::MetadataVersion {:?}", z), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ipc::MetadataVersion::V4
seems also hitting this error? But actually there is CompressionCodecType::NoCompression
indicating no compression is used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also given that this function returns a Result
it seems like we could return a proper error here rather than panic
ing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just copy some code from try_new
.
try_new
just use the CompressionCodecType::NoCompression
and it is same with older version.
try_new_with_compression
want to open the compression, but I think I made a mistake.
I should use the CompressionType
instead of CompressionCodecType
and make sure the compression is enable
arrow/src/ipc/writer.rs
Outdated
#[cfg(any(feature = "zstd,lz4", test))] | ||
compression_codec | ||
.compress(buffer.as_slice(), &mut _compression_buffer) | ||
.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can prevent using compression option in try_new_with_compression
if these features are not included.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya comments like above reply
Maybe I can resolve this option or compile issue in the next issue or pull request.
In this pull request, we can focus on the protocal of IPC compression
// write the schema, set the written bytes to the schema + header | ||
let encoded_message = data_gen.schema_to_bytes(schema, &write_options); | ||
let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?; | ||
Ok(Self { | ||
writer, | ||
write_options, | ||
schema: schema.clone(), | ||
block_offsets: meta + data + 8, | ||
block_offsets: meta + data + header_size, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try_new_with_options
is used to write schema
to the file or stream.
In the arrow format of IPC format, the layout is from the doc https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format
First, we will write
<magic number "ARROW1">
<empty padding bytes [to 8 byte boundary]>
the size of above part is 8bytes and is the length of header_size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @liukun4515 -- I think this is looking pretty good. The biggest thing I would like to sort out the feature flags and keeping the compression code a bit more isolated. Otherwise this is looking close
arrow/Cargo.toml
Outdated
@@ -60,7 +63,7 @@ multiversion = { version = "0.6.1", default-features = false } | |||
bitflags = { version = "1.2.1", default-features = false } | |||
|
|||
[features] | |||
default = ["csv", "ipc", "test_utils"] | |||
default = ["csv", "ipc", "test_utils", "zstd", "lz4"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree -- I suggest something like
default = ["csv", "ipc", "test_utils", "zstd", "lz4"] | |
default = ["csv", "ipc", "test_utils"] |
match self { | ||
CompressionCodecType::Lz4Frame => { | ||
let mut encoder = lz4::EncoderBuilder::new().build(output).unwrap(); | ||
encoder.write_all(input).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest passing the errors back out of here directly (as compress
returns a Result
) rather than unwrap()
which will panic
on error
} | ||
} | ||
|
||
#[cfg(any(feature = "zstd,lz4", test))] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if you can put this guard on the entire mod ipc_compression
statement so that the entire module (including the test) is not compiled unless that feature is active
arrow/src/ipc/writer.rs
Outdated
_ => { | ||
if metadata_version != ipc::MetadataVersion::V5 { | ||
return Err(ArrowError::InvalidArgumentError( | ||
"Compress buffer just support from metadata v5".to_string(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Compress buffer just support from metadata v5".to_string(), | |
"Compression only supported in metadata v5 and above".to_string(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
arrow/src/ipc/writer.rs
Outdated
match batch_compression_type { | ||
CompressionCodecType::NoCompression => {} | ||
_ => { | ||
if metadata_version != ipc::MetadataVersion::V5 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this check be <
instead of !=
to cover future versions?
if metadata_version != ipc::MetadataVersion::V5 { | |
if metadata_version < ipc::MetadataVersion::V5 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
}) | ||
} | ||
} | ||
z => panic!("Unsupported ipc::MetadataVersion {:?}", z), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also given that this function returns a Result
it seems like we could return a proper error here rather than panic
ing
} | ||
|
||
impl IpcWriteOptions { | ||
pub fn try_new_with_compression( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if you could avoid the duplication here with more of a Builder
style:
impl IpcWriteOptions {
pub fn with_compression(mut self, batch_compression_type: CompressionCodecType) -> Result<Self> {
.. // do checks here
self.batch_compresson_type = batch_compression_type;
Ok(self)
}
...
}
Then one could use it like:
let options = IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::v5)?
.with_compression(CompressionCodecType::LZ4)?;
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The constructor may be refactor by follow up PR
arrow/src/ipc/writer.rs
Outdated
(buffer.as_slice(), origin_buffer_len as i64) | ||
} | ||
CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd => { | ||
if (origin_buffer_len as i64) == LENGTH_EMPTY_COMPRESSED_DATA { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this code should throw an "Not supported" error (or panic
if it is encountered without support compiled in)
What is the status of this PR? Is it ready to go? Do we need to mess with the feature flags more? |
I think there are some comments are not addressed yet? |
} | ||
} | ||
CompressionCodecType::NoCompression => Buffer::from(buf_data), | ||
_ => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How to handle this?
If the rust service compiled without the ipc_compression
and receive a message with the ipc compression feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think returning an error is the correct way but as you have identifed above you can't do that without changing the signature to Result<Buffer>
-- but since decompression can fail we probably need to make the change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb
I file a new issue to track this, and will submit a sub pr for this.
arrow/src/ipc/reader.rs
Outdated
} | ||
match compression_codec { | ||
CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd | ||
if cfg!(feature = "ipc_compression") || cfg!(test) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to check the compile options, if receive a message or ipc message with the compression feature.
Ok(()) | ||
} | ||
CompressionCodecType::Zstd => { | ||
let mut encoder = zstd::Encoder::new(output, 0).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we pass the error out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix, and pass the error out.
Err(e) => Err(e.into()), | ||
} | ||
} | ||
_ => Ok(input.len()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this for CompressionCodecType::NoCompression
? If so, do we need to copy form input to output?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the NoCompression
in the CompressionCodecType.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just handle the LZ4 and ZSTD branch.
arrow/src/ipc/reader.rs
Outdated
Buffer::from(data) | ||
} else { | ||
// decompress data using the codec | ||
let mut _uncompressed_buffer = Vec::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We know decompressed_length at this point. Should we allocate the vector with enough capacity?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, we can init the vec with the capacity.
/// -1: indicate that the data that follows is not compressed | ||
/// 0: indicate that there is no data | ||
/// positive number: indicate the uncompressed length for the following data | ||
fn read_uncompressed_size(buffer: &[u8]) -> i64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#[inline]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
arrow/src/ipc/reader.rs
Outdated
Some(compression) => match compression.codec() { | ||
CompressionType::ZSTD => CompressionCodecType::Zstd, | ||
CompressionType::LZ4_FRAME => CompressionCodecType::Lz4Frame, | ||
_ => CompressionCodecType::NoCompression, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there other option?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, in current version, just support the LZ4 and zstd.
/// uncompressed length may be set to -1 to indicate that the data that | ||
/// follows is not compressed, which can be useful for cases where | ||
/// compression does not yield appreciable savings. | ||
fn read_buffer( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO:
we should change the output arg to Result<Buffer>
and return error message if the buffer can't be read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you still intend to make this change? Or is it planned for a subsequent PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
arrow/src/ipc/writer.rs
Outdated
} | ||
CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd => { | ||
if (origin_buffer_len as i64) == LENGTH_EMPTY_COMPRESSED_DATA { | ||
(buffer, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(buffer, 0) | |
(buffer, LENGTH_EMPTY_COMPRESSED_DATA) |
// padding and make offset 8 bytes aligned | ||
let pad_len = pad_to_8(len as u32) as i64; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it padding to data len? Or total len?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In each buffer, we have two struct, one is the metadata which store offset
and actual len
of data, the other is the data.
The actual len is the total_len
.
The pad_len is just used to align the buffer.
lz4 = { version = "1.23", default-features = false } | ||
zstd = { version = "0.11", default-features = false } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think these need to be in dev dependencies do they? If they are already in the dependencies of the crate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can’t remove the lz4 and zstd from dev-dependency.
The ipc_compression is not in the default feature, we can‘t run cargo test
with the lz4
and zstd
lib.
But we need to run the ipc compiression UT in CI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I try to remove these from the dev-dependency, can run cargo test
.
I got the compile error.
error[E0433]: failed to resolve: use of undeclared crate or module `lz4`
--> arrow/src/ipc/compression/ipc_compression.rs:57:39
|
57 | let mut encoder = lz4::EncoderBuilder::new().build(output)?;
| ^^^ use of undeclared crate or module `lz4`
error[E0433]: failed to resolve: use of undeclared crate or module `zstd`
--> arrow/src/ipc/compression/ipc_compression.rs:65:39
|
65 | let mut encoder = zstd::Encoder::new(output, 0)?;
| ^^^^ use of undeclared crate or module `zstd`
|
help: there is a crate or module with a similar name
|
65 | let mut encoder = std::Encoder::new(output, 0)?;
| ~~~
/// uncompressed length may be set to -1 to indicate that the data that | ||
/// follows is not compressed, which can be useful for cases where | ||
/// compression does not yield appreciable savings. | ||
fn read_buffer( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you still intend to make this change? Or is it planned for a subsequent PR?
} | ||
} | ||
CompressionCodecType::NoCompression => Buffer::from(buf_data), | ||
_ => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think returning an error is the correct way but as you have identifed above you can't do that without changing the signature to Result<Buffer>
-- but since decompression can fail we probably need to make the change
// decompress data using the codec | ||
let mut _uncompressed_buffer = | ||
Vec::with_capacity(decompressed_length as usize); | ||
let _input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something still isn't quite right with this code -- instead of gating the code on the test
feature, I think the more typical pattern is to gate the entire test on the ipc_compression
feature
So something like
#[cfg(ipc_compression)]
#[test]
fn read_generated_streams_200() {
let testdata = crate::util::test_util::arrow_test_data();
let version = "2.0.0-compression";
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#[cfg(any(feature = "ipc_compression", test))]
this is not the feature of test, just feature ipc_compression
or test
.
It only can be compiled with the compile flag feature="ipc_compression"
or test
.
There are some usage like in parquet.
arrow-rs/parquet/src/compression.rs
Line 76 in 30c94db
#[cfg(any(feature = "lz4", test))] |
@liukun4515 -- perhaps I can find some time to try and help with this PR. I will try to do so tomorrow |
Thank you, you can also ping me through the slack. |
Thanks @liukun4515 -- I think I have gotten to the point where I can't offer any more specific suggestions on structure without trying it myself. I hope to try and rearrange the feature-flags and make a proposed PR to your branch. I won't have a chance to do it until tomorrow however. |
@liukun4515 -- I made good progress on sorting out the feature flags. A draft PR is here liukun4515#1 in case you want to see the direction I am headed. I sadly ran out of time today, but I will try and finish it up tomorrow. |
thanks for your help!!! |
Thank you -- I hope to push up an update shortly. It turned into a larger change than I was expecting |
Here is my proposal (the PR to liukun4515#1 is now rendered terribly): #2369 |
Thanks for @alamb cooperation. |
Which issue does this PR close?
Closes #1709
Closes #70
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?