From fed029bc007ad77bfffb69f73d4d1339486a879b Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Thu, 4 Aug 2022 09:32:14 +0200 Subject: [PATCH] ipc FileMetaData expose sizes --- examples/ipc_file_mmap.rs | 4 +-- src/ffi/mmap.rs | 69 ++++++++++++++++++++------------------- src/io/ipc/read/reader.rs | 4 +-- src/mmap/mod.rs | 27 +++++++-------- tests/it/io/ipc/mmap.rs | 5 +-- 5 files changed, 56 insertions(+), 53 deletions(-) diff --git a/examples/ipc_file_mmap.rs b/examples/ipc_file_mmap.rs index 038a8eb8226..51dd2a27477 100644 --- a/examples/ipc_file_mmap.rs +++ b/examples/ipc_file_mmap.rs @@ -8,7 +8,7 @@ use arrow2::mmap::{mmap_dictionaries_unchecked, mmap_unchecked}; // Arrow2 requires a struct that implements `Clone + AsRef<[u8]>`, which // usually `Arc` supports. Here we mock it #[derive(Clone)] -struct Mmap(Arc>); +struct Mmap(Vec); impl AsRef<[u8]> for Mmap { #[inline] @@ -19,7 +19,7 @@ impl AsRef<[u8]> for Mmap { fn main() -> Result<()> { // given a mmap - let mmap = Mmap(Arc::new(vec![])); + let mmap = Arc::new(Mmap(vec![])); // read the metadata let metadata = read::read_file_metadata(&mut std::io::Cursor::new(mmap.as_ref()))?; diff --git a/src/ffi/mmap.rs b/src/ffi/mmap.rs index 4a84d9804b0..4bd006393f7 100644 --- a/src/ffi/mmap.rs +++ b/src/ffi/mmap.rs @@ -1,4 +1,5 @@ use std::collections::VecDeque; +use std::sync::Arc; use crate::array::{Array, DictionaryKey, FixedSizeListArray, ListArray, Offset, StructArray}; use crate::datatypes::DataType; @@ -85,11 +86,11 @@ fn get_validity<'a>( } fn create_array< - T: Clone + AsRef<[u8]>, + T: AsRef<[u8]>, I: Iterator>, II: Iterator, >( - data: T, + data: Arc, num_rows: usize, null_count: usize, buffers: I, @@ -111,7 +112,7 @@ fn create_array< let dictionary_ptr = dictionary.map(|array| Box::into_raw(Box::new(array))); - let mut private_data = Box::new(PrivateData:: { + let mut private_data = Box::new(PrivateData::> { data, buffers_ptr, children_ptr, @@ -127,7 +128,7 @@ fn create_array< buffers: private_data.buffers_ptr.as_mut_ptr(), children: private_data.children_ptr.as_mut_ptr(), dictionary: private_data.dictionary_ptr.unwrap_or(std::ptr::null_mut()), - release: Some(release::), + release: Some(release::>), private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void, } } @@ -152,8 +153,8 @@ unsafe extern "C" fn release(array: *mut ArrowArray) { array.release = None; } -fn mmap_binary>( - data: T, +fn mmap_binary>( + data: Arc, node: &Node, block_offset: usize, buffers: &mut VecDeque, @@ -168,7 +169,7 @@ fn mmap_binary>( .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - let data_ref = data.as_ref(); + let data_ref = data.as_ref().as_ref(); let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); @@ -186,8 +187,8 @@ fn mmap_binary>( )) } -fn mmap_fixed_size_binary>( - data: T, +fn mmap_fixed_size_binary>( + data: Arc, node: &Node, block_offset: usize, buffers: &mut VecDeque, @@ -202,7 +203,7 @@ fn mmap_fixed_size_binary>( .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - let data_ref = data.as_ref(); + let data_ref = data.as_ref().as_ref(); let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); @@ -218,8 +219,8 @@ fn mmap_fixed_size_binary>( )) } -fn mmap_null>( - data: T, +fn mmap_null>( + data: Arc, node: &Node, _block_offset: usize, _buffers: &mut VecDeque, @@ -244,8 +245,8 @@ fn mmap_null>( )) } -fn mmap_boolean>( - data: T, +fn mmap_boolean>( + data: Arc, node: &Node, block_offset: usize, buffers: &mut VecDeque, @@ -260,7 +261,7 @@ fn mmap_boolean>( .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - let data_ref = data.as_ref(); + let data_ref = data.as_ref().as_ref(); let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); @@ -280,13 +281,13 @@ fn mmap_boolean>( )) } -fn mmap_primitive>( - data: T, +fn mmap_primitive>( + data: Arc, node: &Node, block_offset: usize, buffers: &mut VecDeque, ) -> Result { - let data_ref = data.as_ref(); + let data_ref = data.as_ref().as_ref(); let num_rows: usize = node .length() @@ -313,8 +314,8 @@ fn mmap_primitive>( } #[allow(clippy::too_many_arguments)] -fn mmap_list>( - data: T, +fn mmap_list>( + data: Arc, node: &Node, block_offset: usize, data_type: &DataType, @@ -335,7 +336,7 @@ fn mmap_list>( .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - let data_ref = data.as_ref(); + let data_ref = data.as_ref().as_ref(); let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); @@ -363,8 +364,8 @@ fn mmap_list>( } #[allow(clippy::too_many_arguments)] -fn mmap_fixed_size_list>( - data: T, +fn mmap_fixed_size_list>( + data: Arc, node: &Node, block_offset: usize, data_type: &DataType, @@ -387,7 +388,7 @@ fn mmap_fixed_size_list>( .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - let data_ref = data.as_ref(); + let data_ref = data.as_ref().as_ref(); let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); @@ -412,8 +413,8 @@ fn mmap_fixed_size_list>( } #[allow(clippy::too_many_arguments)] -fn mmap_struct>( - data: T, +fn mmap_struct>( + data: Arc, node: &Node, block_offset: usize, data_type: &DataType, @@ -434,7 +435,7 @@ fn mmap_struct>( .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - let data_ref = data.as_ref(); + let data_ref = data.as_ref().as_ref(); let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); @@ -466,8 +467,8 @@ fn mmap_struct>( } #[allow(clippy::too_many_arguments)] -fn mmap_dict>( - data: T, +fn mmap_dict>( + data: Arc, node: &Node, block_offset: usize, _: &DataType, @@ -486,7 +487,7 @@ fn mmap_dict>( .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - let data_ref = data.as_ref(); + let data_ref = data.as_ref().as_ref(); let dictionary = dictionaries .get(&ipc_field.dictionary_id.unwrap()) @@ -507,8 +508,8 @@ fn mmap_dict>( )) } -fn get_array>( - data: T, +fn get_array>( + data: Arc, block_offset: usize, data_type: &DataType, ipc_field: &IpcField, @@ -587,8 +588,8 @@ fn get_array>( } /// Maps a memory region to an [`Array`]. -pub(crate) unsafe fn mmap>( - data: T, +pub(crate) unsafe fn mmap>( + data: Arc, block_offset: usize, data_type: DataType, ipc_field: &IpcField, diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index 9fb428eced5..37b8437c271 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -27,13 +27,13 @@ pub struct FileMetadata { /// The blocks in the file /// /// A block indicates the regions in the file to read to get data - pub(crate) blocks: Vec, + pub blocks: Vec, /// Dictionaries associated to each dict_id pub(crate) dictionaries: Option>, /// The total size of the file in bytes - pub(crate) size: u64, + pub size: u64, } fn read_dictionary_message( diff --git a/src/mmap/mod.rs b/src/mmap/mod.rs index b7822b5f4cd..55bd7d01f80 100644 --- a/src/mmap/mod.rs +++ b/src/mmap/mod.rs @@ -1,5 +1,6 @@ //! Memory maps regions defined on the IPC format into [`Array`]. use std::collections::VecDeque; +use std::sync::Arc; use crate::array::Array; use crate::chunk::Chunk; @@ -74,10 +75,10 @@ fn get_buffers_nodes( Ok((buffers, field_nodes)) } -unsafe fn _mmap_record>( +unsafe fn _mmap_record>( fields: &[Field], ipc_fields: &[IpcField], - data: T, + data: Arc, batch: RecordBatchRef, offset: usize, dictionaries: &Dictionaries, @@ -104,14 +105,14 @@ unsafe fn _mmap_record>( .and_then(Chunk::try_new) } -unsafe fn _mmap_unchecked>( +unsafe fn _mmap_unchecked>( fields: &[Field], ipc_fields: &[IpcField], - data: T, + data: Arc, block: Block, dictionaries: &Dictionaries, ) -> Result>, Error> { - let (message, offset) = read_message(data.as_ref(), block)?; + let (message, offset) = read_message(data.as_ref().as_ref(), block)?; let batch = get_record_batch(message)?; _mmap_record( fields, @@ -134,15 +135,15 @@ unsafe fn _mmap_unchecked>( /// The caller must ensure that `data` contains a valid buffers, for example: /// * Offsets in variable-sized containers must be in-bounds and increasing /// * Utf8 data is valid -pub unsafe fn mmap_unchecked>( +pub unsafe fn mmap_unchecked>( metadata: &FileMetadata, dictionaries: &Dictionaries, - data: T, + data: Arc, chunk: usize, ) -> Result>, Error> { let block = metadata.blocks[chunk]; - let (message, offset) = read_message(data.as_ref(), block)?; + let (message, offset) = read_message(data.as_ref().as_ref(), block)?; let batch = get_record_batch(message)?; _mmap_record( &metadata.schema.fields, @@ -154,13 +155,13 @@ pub unsafe fn mmap_unchecked>( ) } -unsafe fn mmap_dictionary>( +unsafe fn mmap_dictionary>( metadata: &FileMetadata, - data: T, + data: Arc, block: Block, dictionaries: &mut Dictionaries, ) -> Result<(), Error> { - let (message, offset) = read_message(data.as_ref(), block)?; + let (message, offset) = read_message(data.as_ref().as_ref(), block)?; let batch = get_dictionary_batch(&message)?; let id = batch @@ -205,9 +206,9 @@ unsafe fn mmap_dictionary>( /// The caller must ensure that `data` contains a valid buffers, for example: /// * Offsets in variable-sized containers must be in-bounds and increasing /// * Utf8 data is valid -pub unsafe fn mmap_dictionaries_unchecked>( +pub unsafe fn mmap_dictionaries_unchecked>( metadata: &FileMetadata, - data: T, + data: Arc, ) -> Result { let blocks = if let Some(blocks) = &metadata.dictionaries { blocks diff --git a/tests/it/io/ipc/mmap.rs b/tests/it/io/ipc/mmap.rs index 9142227b1fc..ef317b9a69c 100644 --- a/tests/it/io/ipc/mmap.rs +++ b/tests/it/io/ipc/mmap.rs @@ -3,6 +3,7 @@ use arrow2::chunk::Chunk; use arrow2::datatypes::{DataType, Field, Schema}; use arrow2::error::Result; use arrow2::io::ipc::read::read_file_metadata; +use std::sync::Arc; use super::write::file::write; @@ -10,9 +11,9 @@ fn round_trip(array: Box) -> Result<()> { let schema = Schema::from(vec![Field::new("a", array.data_type().clone(), true)]); let columns = Chunk::try_new(vec![array.clone()])?; - let data = write(&[columns], &schema, None, None)?; + let data = Arc::new(write(&[columns], &schema, None, None)?); - let metadata = read_file_metadata(&mut std::io::Cursor::new(&data))?; + let metadata = read_file_metadata(&mut std::io::Cursor::new(data.as_ref()))?; let dictionaries = unsafe { arrow2::mmap::mmap_dictionaries_unchecked(&metadata, data.clone())? };