-
Notifications
You must be signed in to change notification settings - Fork 234
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix: copy arrays when placing them in the v2 writer's accumulation qu…
…eue (#2249) When a record batch crosses the FFI layer we lose the ability to deallocate that batch one array at a time (every array in the batch keeps a reference counted pointer to the batch). This is a problem for the writer because we usually want to flush some arrays to disk and keep other arrays in memory for a while longer. However, we do want to release the arrays we flush to disk to avoid memory leaks. This issue was highlighted in a test / demo that attempted to write video data to a lance file. In this situation we were writing the data one row at a time. There were only 30,000 rows in the source data but this was hundreds of GB. None of the metadata columns would ever flush to disk (e.g. a 4 byte int column @ 30,000 rows is only 120KB). These metadata arrays were keeping the video data alive in memory and the writer was taking up too much data. The solution in this PR is to perform a deep copy of any data we are planning on flushing to disk. This is unfortunate, and there is a configuration parameter to disable this copy (I have, intentionally, chosen not to expose this in the python API since data from python always crosses an FFI boundary and is susceptible to this problem). However, copying this data by default is a much safer course of action. In the future we could investigate alternative ways of passing the data across the FFI boundary (e.g. instead of sending the entire record batch we could send individual arrays across and then reassemble them into a record batch on the other side). However, I don't think we need to worry too much about this until we see writer CPU performance become an issue. In most cases the batches will probably be in the CPU cache already and so this should be a pretty quick write. Also, writers that are writing any significant amount of data will be I/O bound.
- Loading branch information
1 parent
6d1364b
commit fb43192
Showing
12 changed files
with
168 additions
and
15 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
// SPDX-License-Identifier: Apache-2.0 | ||
// SPDX-FileCopyrightText: Copyright The Lance Authors | ||
|
||
use std::sync::Arc; | ||
|
||
use arrow_array::{make_array, Array, RecordBatch}; | ||
use arrow_buffer::{Buffer, NullBuffer}; | ||
use arrow_data::ArrayData; | ||
|
||
pub fn deep_copy_buffer(buffer: &Buffer) -> Buffer { | ||
Buffer::from(Vec::from(buffer.as_slice())) | ||
} | ||
|
||
fn deep_copy_nulls(nulls: &NullBuffer) -> Buffer { | ||
deep_copy_buffer(nulls.inner().inner()) | ||
} | ||
|
||
pub fn deep_copy_array_data(data: &ArrayData) -> ArrayData { | ||
let data_type = data.data_type().clone(); | ||
let len = data.len(); | ||
let null_count = data.null_count(); | ||
let null_bit_buffer = data.nulls().map(deep_copy_nulls); | ||
let offset = data.offset(); | ||
let buffers = data | ||
.buffers() | ||
.iter() | ||
.map(deep_copy_buffer) | ||
.collect::<Vec<_>>(); | ||
let child_data = data | ||
.child_data() | ||
.iter() | ||
.map(deep_copy_array_data) | ||
.collect::<Vec<_>>(); | ||
unsafe { | ||
ArrayData::new_unchecked( | ||
data_type, | ||
len, | ||
Some(null_count), | ||
null_bit_buffer, | ||
offset, | ||
buffers, | ||
child_data, | ||
) | ||
} | ||
} | ||
|
||
pub fn deep_copy_array(array: &dyn Array) -> Arc<dyn Array> { | ||
let data = array.to_data(); | ||
let data = deep_copy_array_data(&data); | ||
make_array(data) | ||
} | ||
|
||
pub fn deep_copy_batch(batch: &RecordBatch) -> crate::Result<RecordBatch> { | ||
let arrays = batch | ||
.columns() | ||
.iter() | ||
.map(|array| deep_copy_array(array)) | ||
.collect::<Vec<_>>(); | ||
RecordBatch::try_new(batch.schema().clone(), arrays) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.