-
Notifications
You must be signed in to change notification settings - Fork 234
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: copy arrays when placing them in the v2 writer's accumulation queue #2249
fix: copy arrays when placing them in the v2 writer's accumulation queue #2249
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2249 +/- ##
==========================================
+ Coverage 81.03% 81.08% +0.05%
==========================================
Files 186 187 +1
Lines 53753 53963 +210
Branches 53753 53963 +210
==========================================
+ Hits 43558 43756 +198
- Misses 7715 7730 +15
+ Partials 2480 2477 -3
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
bb54a47
to
1e1bfca
Compare
use arrow_data::ArrayData; | ||
|
||
pub fn deep_copy_buffer(buffer: &Buffer) -> Buffer { | ||
Buffer::from(Vec::from(buffer.as_slice())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to copy data here?
Can we just do buffer.data.clone()
?
https://docs.rs/arrow-buffer/51.0.0/src/arrow_buffer/buffer/immutable.rs.html#34
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot. When the buffer is imported over FFI, the refcount of the buffer is tied to the entire batch. Thus, it can't be freed until all other columns in the same batch are also ready to be freed. By copying the buffer into Rust, we gain control over the lifetime and can free it earlier.
Like Weston said in the PR description, we can solve this by importing each column individually over FFI.
// Push into buffered_arrays without copy since we are about to flush anyways | ||
self.buffered_arrays.push(array); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice 👍
When a record batch crosses the FFI layer we lose the ability to deallocate that batch one array at a time (every array in the batch keeps a reference counted pointer to the batch). This is a problem for the writer because we usually want to flush some arrays to disk and keep other arrays in memory for a while longer. However, we do want to release the arrays we flush to disk to avoid memory leaks.
This issue was highlighted in a test / demo that attempted to write video data to a lance file. In this situation we were writing the data one row at a time. There were only 30,000 rows in the source data but this was hundreds of GB. None of the metadata columns would ever flush to disk (e.g. a 4 byte int column @ 30,000 rows is only 120KB). These metadata arrays were keeping the video data alive in memory and the writer was taking up too much data.
The solution in this PR is to perform a deep copy of any data we are planning on flushing to disk. This is unfortunate, and there is a configuration parameter to disable this copy (I have, intentionally, chosen not to expose this in the python API since data from python always crosses an FFI boundary and is susceptible to this problem). However, copying this data by default is a much safer course of action.
In the future we could investigate alternative ways of passing the data across the FFI boundary (e.g. instead of sending the entire record batch we could send individual arrays across and then reassemble them into a record batch on the other side). However, I don't think we need to worry too much about this until we see writer CPU performance become an issue. In most cases the batches will probably be in the CPU cache already and so this should be a pretty quick write. Also, writers that are writing any significant amount of data will be I/O bound.