-
Notifications
You must be signed in to change notification settings - Fork 842
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(arrow-ipc) Add: Support FileReader and StreamReader skip array data validation #6938
base: main
Are you sure you want to change the base?
Conversation
FYI, clippy does complain about too many arguments on one or two function signatures atm. I just want to give this PR a go for now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately as implemented this is unsound, in that it permits UB via safe functions, this probably needs a bit more thought
@@ -79,6 +79,7 @@ fn create_array( | |||
field: &Field, | |||
variadic_counts: &mut VecDeque<i64>, | |||
require_alignment: bool, | |||
skip_validations: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically all of these functions are now unsound and must be marked unsafe
. This is not ideal, and probably needs a bit more thought into handling this better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One possibility might be to create a new function like:
fn unsafe create_array_unchecked(
reader: &mut ArrayReader,
field: &Field,
variadic_counts: &mut VecDeque<i64>,
require_alignment: bool,
skip_validations: bool,
) -> Result<ArrayRef, ArrowError> {
And change the existing function to call it:
fn create_array(
reader: &mut ArrayReader,
field: &Field,
variadic_counts: &mut VecDeque<i64>,
require_alignment: bool,
) -> Result<ArrayRef, ArrowError> {
let skip_validations = false;
// safety: enable validatiions when checking
unsafe { create_array_unchecked(reader, field, variadic_counts, require_aligment, skip_validations) };
}
This is kind of messy though -- finding some way to encapsulate the settings into a struct would be better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pls check the latest commit, which separating unsafe codes.
Each function like create_array
, now has a unsafe version that skips validation. Codebase is not idea, but currently I don't have much better idea to encapusulate it.
Refactor all unsafe codes into separate functions. It is a bit messy as mentioned @alamb mentioned. From bottom-to-up, there are many code movings and renames. Now, /// An iterator over the record batches (without validation) in an Arrow file
pub struct UnvalidatedFileReader<R: Read + Seek> {
reader: FileReader<R>,
}
impl<R: Read + Seek> Iterator for UnvalidatedFileReader<R> {
type Item = Result<RecordBatch, ArrowError>;
fn next(&mut self) -> Option<Self::Item> {
if self.reader.current_block < self.reader.total_blocks {
// Use the unsafe `maybe_next_unvalidated` function
unsafe {
match self.reader.maybe_next_unvalidated() {
Ok(Some(batch)) => Some(Ok(batch)),
Ok(None) => None, // End of the file
Err(e) => Some(Err(e)),
}
}
} else {
None
}
}
} This still unsound iteractor implementation, but I am not sure how to express it so that we can use iterator with validation disabled. Any thoughts or suggestions? |
Thanks @totoroyyb - I will try and review this PR in the next day or two. |
I am starting to review this PR now -- thank you for your patience |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First of all, thank you @totoroyyb -- I think this is a very nice feature and it would be great to get in. The current code is quite nicely docmented
At a minimum I think the PR needs
- Tests: There should be tests that demonstrate that reading / writing data while skipping validation still works functionally. This includes at a minimum testing any new
pub
apis - Benchmarks: given the goal of this PR is to improve performance I think we need a benchmark to show it is actually improving the speed. Unfortunately, I wasn't able to find any existing arrow-ipc benchmarks, but can help draft one shortly
I would like to suggest we break it down a little differently to minimize API changes
- Avoid adding
ArrayDataBuilder::build_aligned_unchecked
and instead simply add a flag on the builder. I will try and add a PR on this shortly - Consider adding flags to
StreamReader
andFilerReader
rather than new structs likeUnvalidatedStreamReader
/// This is useful when the file is known to be valid and the user wants to skip validations. | ||
/// This might be useful when the content is trusted and the user wants to avoid the overhead of | ||
/// validating the content. | ||
pub fn try_new_unvalidated( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like it doesn't set the unvalidated flag. It seems like maybe we can direct the users to the `FileReaderBuilder API if they want to avoid validation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one was left here due to the changes I made in the last commit.
@@ -1148,6 +1528,30 @@ impl<R: Read + Seek> RecordBatchReader for FileReader<R> { | |||
} | |||
} | |||
|
|||
/// An iterator over the record batches (without validation) in an Arrow file | |||
pub struct UnvalidatedFileReader<R: Read + Seek> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the need for a whole different struct -- wouldn't a flag on FileReader
(to mirror the flag on StreamReader
) be more consistent?
TLDR is I think this is an important PR / feature @totoroyyb and I would like to help make it happen. I will make some initial PRs that hopefully move us forwards Would you be willing to help write tests and benchmarks? |
@alamb Thanks for reviewing this. Yes, I'd be happy to help with benchmarks and test cases. |
Co-authored-by: Andrew Lamb <[email protected]>
@alamb Regarding adding a flag in the builder, which I did that way in the first commit. tustvold has suggested that we may need to mark unsafe for all functions that skip validation (here). Seems like with a flag, there is no good way to mark it as unsafe for related functions. Let me know if I misunderstand something. Thanks! |
Yeah, sorry this is a bit subtle -- a new However I think a However, given that the code in arrow-ipc/src/reader.rs is currently mostly structured as free functions, rather than methods on a struct, there is no place to put such a field So what I was suggesting was to restructure the code in fn create_array(
reader: &mut ArrayReader,
field: &Field,
variadic_counts: &mut VecDeque<i64>,
require_alignment: bool,
) -> Result<ArrayRef, ArrowError> { We turn this into a method on ArrayReader: impl ArrayReader {
fn create_array(
&mut self,
field: &Field,
variadic_counts: &mut VecDeque<i64>,
require_alignment: bool,
) -> Result<ArrayRef, ArrowError> { And then we can add a field on the ArrayReader for skipping validation Note the methods might not belong on |
Thank you @totoroyyb Thank you so much.For benchmarks, I wrote up a description here:
For tests, I think we could leverage the existing tests. For example in these locations: arrow-rs/arrow-ipc/src/writer.rs Lines 1799 to 1801 in 955180b
We could add a few lines of code that deserialized the |
In terms of next steps for this PR I recommend:
Once those three PRs are done and merged, then we can rebase / rework this PR to add the Let me know what you think @totoroyyb . If you have time, the refactoring or benchmarks might be a good next step. Depending on if I can find time, I may also be able to help (maybe try to refactor the code into a struct) |
Thanks @alamb for detailed description and explanation on everything. I might not be readily available in the following a few days, so if anyone wants to jump in, please feel free to do so. I will take another detail look into this asap. I agree that there is no good way to adhere Rust unsafe semantics here and I feel like put it as a field is the best we can do. When I got time, I will take on benchmark and refactor and let you know. |
Which issue does this PR close?
Rationale for this change
Beforehand, array data validation is performed for each array creation when reading an IPC file (or stream), which comes with a significant overhead. In some cases, this overhead is unwanted or the file content is trusted.
There are existing functions defined in the codebase to avoid data validation but this is not exposed to the upper level APIs. This PR brings options for both
FileReader
andStreamReader
to disable it.What changes are included in this PR?
skip_validations
, as an argument, to multiple functions signatures.try_new_unvalidated
toFileReader
with_skip_validations
toStreamReader
Are there any user-facing changes?
No, I don't think so. There are no API-breaking changes, and essentially, two new APIs are introduced. Other changes are on the internal codes I believe.