From 7b9509386a6d3833f7910aafb985965010e36dd3 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Tue, 10 Aug 2021 17:13:15 +0000 Subject: [PATCH 1/3] Simplified code. --- src/array/binary/ffi.rs | 2 +- src/array/boolean/ffi.rs | 3 + src/array/list/ffi.rs | 2 +- src/array/primitive/ffi.rs | 2 +- src/array/struct_.rs | 8 +- src/ffi/array.rs | 18 ++- src/ffi/ffi.rs | 312 ++----------------------------------- src/ffi/mod.rs | 1 + src/ffi/schema.rs | 292 ++++++++++++++++++++++++++++++++++ 9 files changed, 329 insertions(+), 311 deletions(-) create mode 100644 src/ffi/schema.rs diff --git a/src/array/binary/ffi.rs b/src/array/binary/ffi.rs index 2771ee9a276..a216be7b421 100644 --- a/src/array/binary/ffi.rs +++ b/src/array/binary/ffi.rs @@ -25,7 +25,7 @@ unsafe impl ToFfi for BinaryArray { unsafe impl FromFfi for BinaryArray { fn try_from_ffi(array: A) -> Result { - let data_type = array.data_type()?; + let data_type = array.field()?.data_type().clone(); let expected = if O::is_large() { DataType::LargeBinary } else { diff --git a/src/array/boolean/ffi.rs b/src/array/boolean/ffi.rs index a3b9b651feb..b141c3a3a69 100644 --- a/src/array/boolean/ffi.rs +++ b/src/array/boolean/ffi.rs @@ -1,5 +1,6 @@ use crate::{ array::{FromFfi, ToFfi}, + datatypes::DataType, ffi, }; @@ -22,6 +23,8 @@ unsafe impl ToFfi for BooleanArray { unsafe impl FromFfi for BooleanArray { fn try_from_ffi(array: A) -> Result { + let data_type = array.field()?.data_type().clone(); + assert_eq!(data_type, DataType::Boolean); let length = array.array().len(); let offset = array.array().offset(); let mut validity = unsafe { array.validity() }?; diff --git a/src/array/list/ffi.rs b/src/array/list/ffi.rs index cd794ff5cca..63d6e8f5863 100644 --- a/src/array/list/ffi.rs +++ b/src/array/list/ffi.rs @@ -24,7 +24,7 @@ unsafe impl ToFfi for ListArray { unsafe impl FromFfi for ListArray { fn try_from_ffi(array: A) -> Result { - let data_type = array.data_type()?; + let data_type = array.field()?.data_type().clone(); let length = array.array().len(); let offset = array.array().offset(); let mut validity = unsafe { array.validity() }?; diff --git a/src/array/primitive/ffi.rs b/src/array/primitive/ffi.rs index 87980219c0c..5168f322b70 100644 --- a/src/array/primitive/ffi.rs +++ b/src/array/primitive/ffi.rs @@ -24,7 +24,7 @@ unsafe impl ToFfi for PrimitiveArray { unsafe impl FromFfi for PrimitiveArray { fn try_from_ffi(array: A) -> Result { - let data_type = array.data_type()?; + let data_type = array.field()?.data_type().clone(); let length = array.array().len(); let offset = array.array().offset(); let mut validity = unsafe { array.validity() }?; diff --git a/src/array/struct_.rs b/src/array/struct_.rs index e1770a12901..86c2b092102 100644 --- a/src/array/struct_.rs +++ b/src/array/struct_.rs @@ -77,8 +77,8 @@ impl StructArray { } impl StructArray { - pub fn get_fields(datatype: &DataType) -> &[Field] { - if let DataType::Struct(fields) = datatype { + pub fn get_fields(data_type: &DataType) -> &[Field] { + if let DataType::Struct(fields) = data_type { fields } else { panic!("Wrong datatype passed to Struct.") @@ -139,8 +139,8 @@ unsafe impl ToFfi for StructArray { unsafe impl FromFfi for StructArray { fn try_from_ffi(array: A) -> Result { - let data_type = array.data_type()?; - let fields = Self::get_fields(&data_type).to_vec(); + let field = array.field()?; + let fields = Self::get_fields(field.data_type()).to_vec(); let length = array.array().len(); let offset = array.array().offset(); diff --git a/src/ffi/array.rs b/src/ffi/array.rs index 0929aaebc97..6c6ead2b4b2 100644 --- a/src/ffi/array.rs +++ b/src/ffi/array.rs @@ -32,8 +32,8 @@ use crate::{ /// * the data type is not supported /// * the interface is not valid (e.g. a null pointer) pub fn try_from(array: A) -> Result> { - let data_type = array.data_type()?; - let array: Box = match data_type { + let field = array.field()?; + let array: Box = match field.data_type() { DataType::Boolean => Box::new(BooleanArray::try_from_ffi(array)?), DataType::Int8 => Box::new(PrimitiveArray::::try_from_ffi(array)?), DataType::Int16 => Box::new(PrimitiveArray::::try_from_ffi(array)?), @@ -199,4 +199,18 @@ mod tests { test_round_trip(array) } + + /* + #[test] + fn test_dict() -> Result<()> { + let data = vec![Some("a"), Some("a"), None, Some("b")]; + + let mut array = MutableDictionaryArray::>::new(); + array.try_extend(data)?; + + let array: DictionaryArray = array.into(); + + test_round_trip(array) + } + */ } diff --git a/src/ffi/ffi.rs b/src/ffi/ffi.rs index f4c1f776691..97922e626e8 100644 --- a/src/ffi/ffi.rs +++ b/src/ffi/ffi.rs @@ -15,13 +15,9 @@ // specific language governing permissions and limitations // under the License. -use std::{ - ffi::CStr, - ffi::CString, - ptr::{self, NonNull}, - sync::Arc, -}; +use std::{ptr::NonNull, sync::Arc}; +use super::schema::{to_field, Ffi_ArrowSchema}; use crate::{ array::{buffers_children, Array}, bitmap::{utils::bytes_for, Bitmap}, @@ -29,297 +25,11 @@ use crate::{ bytes::{Bytes, Deallocation}, Buffer, }, - datatypes::{DataType, Field, IntervalUnit, TimeUnit}, + datatypes::{DataType, Field}, error::{ArrowError, Result}, types::NativeType, }; -#[allow(dead_code)] -struct SchemaPrivateData { - field: Field, - children_ptr: Box<[*mut Ffi_ArrowSchema]>, -} - -/// ABI-compatible struct for `ArrowSchema` from C Data Interface -/// See -/// This was created by bindgen -#[repr(C)] -#[derive(Debug)] -pub struct Ffi_ArrowSchema { - format: *const ::std::os::raw::c_char, - name: *const ::std::os::raw::c_char, - metadata: *const ::std::os::raw::c_char, - flags: i64, - n_children: i64, - children: *mut *mut Ffi_ArrowSchema, - dictionary: *mut Ffi_ArrowSchema, - release: ::std::option::Option, - private_data: *mut ::std::os::raw::c_void, -} - -// callback used to drop [Ffi_ArrowSchema] when it is exported. -unsafe extern "C" fn c_release_schema(schema: *mut Ffi_ArrowSchema) { - if schema.is_null() { - return; - } - let schema = &mut *schema; - - // take ownership back to release it. - CString::from_raw(schema.format as *mut std::os::raw::c_char); - CString::from_raw(schema.name as *mut std::os::raw::c_char); - let private = Box::from_raw(schema.private_data as *mut SchemaPrivateData); - for child in private.children_ptr.iter() { - let _ = Box::from_raw(*child); - } - - schema.release = None; -} - -impl Ffi_ArrowSchema { - /// create a new [`Ffi_ArrowSchema`]. This fails if the fields' [`DataType`] is not supported. - fn try_new(field: Field) -> Result { - let format = to_format(field.data_type())?; - let name = field.name().clone(); - - // allocate (and hold) the children - let children_vec = match field.data_type() { - DataType::List(field) => { - vec![Box::new(Ffi_ArrowSchema::try_new(field.as_ref().clone())?)] - } - DataType::LargeList(field) => { - vec![Box::new(Ffi_ArrowSchema::try_new(field.as_ref().clone())?)] - } - DataType::Struct(fields) => fields - .iter() - .map(|field| Ok(Box::new(Ffi_ArrowSchema::try_new(field.clone())?))) - .collect::>>()?, - _ => vec![], - }; - // note: this cannot be done along with the above because the above is fallible and this op leaks. - let children_ptr = children_vec - .into_iter() - .map(Box::into_raw) - .collect::>(); - let n_children = children_ptr.len() as i64; - - let flags = field.is_nullable() as i64 * 2; - - let mut private = Box::new(SchemaPrivateData { - field, - children_ptr, - }); - - // - Ok(Ffi_ArrowSchema { - format: CString::new(format).unwrap().into_raw(), - name: CString::new(name).unwrap().into_raw(), - metadata: std::ptr::null_mut(), - flags, - n_children, - children: private.children_ptr.as_mut_ptr(), - dictionary: std::ptr::null_mut(), - release: Some(c_release_schema), - private_data: Box::into_raw(private) as *mut ::std::os::raw::c_void, - }) - } - - /// create an empty [Ffi_ArrowSchema] - fn empty() -> Self { - Self { - format: std::ptr::null_mut(), - name: std::ptr::null_mut(), - metadata: std::ptr::null_mut(), - flags: 0, - n_children: 0, - children: ptr::null_mut(), - dictionary: std::ptr::null_mut(), - release: None, - private_data: std::ptr::null_mut(), - } - } - - /// returns the format of this schema. - pub fn format(&self) -> &str { - assert!(!self.format.is_null()); - // safe because the lifetime of `self.format` equals `self` - unsafe { CStr::from_ptr(self.format) } - .to_str() - .expect("The external API has a non-utf8 as format") - } - - /// returns the name of this schema. - pub fn name(&self) -> &str { - assert!(!self.name.is_null()); - // safe because the lifetime of `self.name` equals `self` - unsafe { CStr::from_ptr(self.name) }.to_str().unwrap() - } - - pub fn child(&self, index: usize) -> &Self { - assert!(index < self.n_children as usize); - assert!(!self.name.is_null()); - unsafe { self.children.add(index).as_ref().unwrap().as_ref().unwrap() } - } - - pub fn nullable(&self) -> bool { - (self.flags / 2) & 1 == 1 - } -} - -impl Drop for Ffi_ArrowSchema { - fn drop(&mut self) { - match self.release { - None => (), - Some(release) => unsafe { release(self) }, - }; - } -} - -/// See https://arrow.apache.org/docs/format/CDataInterface.html#data-type-description-format-strings -fn to_field(schema: &Ffi_ArrowSchema) -> Result { - let data_type = match schema.format() { - "n" => DataType::Null, - "b" => DataType::Boolean, - "c" => DataType::Int8, - "C" => DataType::UInt8, - "s" => DataType::Int16, - "S" => DataType::UInt16, - "i" => DataType::Int32, - "I" => DataType::UInt32, - "l" => DataType::Int64, - "L" => DataType::UInt64, - "e" => DataType::Float16, - "f" => DataType::Float32, - "g" => DataType::Float64, - "z" => DataType::Binary, - "Z" => DataType::LargeBinary, - "u" => DataType::Utf8, - "U" => DataType::LargeUtf8, - "tdD" => DataType::Date32, - "tdm" => DataType::Date64, - "tts" => DataType::Time32(TimeUnit::Second), - "ttm" => DataType::Time32(TimeUnit::Millisecond), - "ttu" => DataType::Time64(TimeUnit::Microsecond), - "ttn" => DataType::Time64(TimeUnit::Nanosecond), - "tDs" => DataType::Duration(TimeUnit::Second), - "tDm" => DataType::Duration(TimeUnit::Millisecond), - "tDu" => DataType::Duration(TimeUnit::Microsecond), - "tDn" => DataType::Duration(TimeUnit::Nanosecond), - "tiM" => DataType::Interval(IntervalUnit::YearMonth), - "tiD" => DataType::Interval(IntervalUnit::DayTime), - "+l" => { - let child = schema.child(0); - DataType::List(Box::new(to_field(child)?)) - } - "+L" => { - let child = schema.child(0); - DataType::LargeList(Box::new(to_field(child)?)) - } - "+s" => { - let children = (0..schema.n_children as usize) - .map(|x| to_field(schema.child(x))) - .collect::>>()?; - DataType::Struct(children) - } - other => { - let parts = other.split(':').collect::>(); - if parts.len() == 2 && parts[0] == "tss" { - DataType::Timestamp(TimeUnit::Second, Some(parts[1].to_string())) - } else if parts.len() == 2 && parts[0] == "tsm" { - DataType::Timestamp(TimeUnit::Millisecond, Some(parts[1].to_string())) - } else if parts.len() == 2 && parts[0] == "tsu" { - DataType::Timestamp(TimeUnit::Microsecond, Some(parts[1].to_string())) - } else if parts.len() == 2 && parts[0] == "tsn" { - DataType::Timestamp(TimeUnit::Nanosecond, Some(parts[1].to_string())) - } else if parts.len() == 2 && parts[0] == "d" { - let parts = parts[1].split(',').collect::>(); - if parts.len() < 2 || parts.len() > 3 { - return Err(ArrowError::Ffi( - "Decimal must contain 2 or 3 comma-separated values".to_string(), - )); - }; - if parts.len() == 3 { - let bit_width = parts[0].parse::().map_err(|_| { - ArrowError::Ffi("Decimal bit width is not a valid integer".to_string()) - })?; - if bit_width != 128 { - return Err(ArrowError::Ffi("Decimal256 is not supported".to_string())); - } - } - let precision = parts[0].parse::().map_err(|_| { - ArrowError::Ffi("Decimal precision is not a valid integer".to_string()) - })?; - let scale = parts[1].parse::().map_err(|_| { - ArrowError::Ffi("Decimal scale is not a valid integer".to_string()) - })?; - DataType::Decimal(precision, scale) - } else { - return Err(ArrowError::Ffi(format!( - "The datatype \"{}\" is still not supported in Rust implementation", - other - ))); - } - } - }; - Ok(Field::new(schema.name(), data_type, schema.nullable())) -} - -/// the inverse of [to_datatype] -fn to_format(data_type: &DataType) -> Result { - Ok(match data_type { - DataType::Null => "n", - DataType::Boolean => "b", - DataType::Int8 => "c", - DataType::UInt8 => "C", - DataType::Int16 => "s", - DataType::UInt16 => "S", - DataType::Int32 => "i", - DataType::UInt32 => "I", - DataType::Int64 => "l", - DataType::UInt64 => "L", - DataType::Float16 => "e", - DataType::Float32 => "f", - DataType::Float64 => "g", - DataType::Binary => "z", - DataType::LargeBinary => "Z", - DataType::Utf8 => "u", - DataType::LargeUtf8 => "U", - DataType::Date32 => "tdD", - DataType::Date64 => "tdm", - DataType::Time32(TimeUnit::Second) => "tts", - DataType::Time32(TimeUnit::Millisecond) => "ttm", - DataType::Time64(TimeUnit::Microsecond) => "ttu", - DataType::Time64(TimeUnit::Nanosecond) => "ttn", - DataType::Duration(TimeUnit::Second) => "tDs", - DataType::Duration(TimeUnit::Millisecond) => "tDm", - DataType::Duration(TimeUnit::Microsecond) => "tDu", - DataType::Duration(TimeUnit::Nanosecond) => "tDn", - DataType::Interval(IntervalUnit::YearMonth) => "tiM", - DataType::Interval(IntervalUnit::DayTime) => "tiD", - DataType::Timestamp(unit, tz) => { - let unit = match unit { - TimeUnit::Second => "s", - TimeUnit::Millisecond => "m", - TimeUnit::Microsecond => "u", - TimeUnit::Nanosecond => "n", - }; - return Ok(format!( - "ts{}:{}", - unit, - tz.as_ref().map(|x| x.as_ref()).unwrap_or("") - )); - } - DataType::Decimal(precision, scale) => return Ok(format!("d:{},{}", precision, scale)), - DataType::List(_) => "+l", - DataType::LargeList(_) => "+L", - DataType::Struct(_) => "+s", - DataType::FixedSizeBinary(size) => return Ok(format!("w{}", size)), - DataType::FixedSizeList(_, size) => return Ok(format!("+w:{}", size)), - DataType::Union(_) => todo!(), - _ => todo!(), - } - .to_string()) -} - /// ABI-compatible struct for ArrowArray from C Data Interface /// See /// This was created by bindgen @@ -573,11 +283,9 @@ fn create_child( assert!(!array.children.is_null()); unsafe { let arr_ptr = *array.children.add(index); - let schema_ptr = *schema.children.add(index); + let schema_ptr = schema.child(index); assert!(!arr_ptr.is_null()); - assert!(!schema_ptr.is_null()); let arr_ptr = &*arr_ptr; - let schema_ptr = &*schema_ptr; Ok(ArrowArrayChild::from_raw(arr_ptr, schema_ptr, parent)) } } @@ -608,7 +316,7 @@ pub trait ArrowArrayRef { // +1 to ignore null bitmap create_buffer::( self.array(), - &self.data_type()?, + self.field()?.data_type(), self.deallocation(), index + 1, ) @@ -629,7 +337,7 @@ pub trait ArrowArrayRef { fn parent(&self) -> &Arc; fn array(&self) -> &Ffi_ArrowArray; fn schema(&self) -> &Ffi_ArrowSchema; - fn data_type(&self) -> Result; + fn field(&self) -> Result; } /// Struct used to move an Array from and to the C Data Interface. @@ -659,8 +367,8 @@ pub struct ArrowArray { impl ArrowArrayRef for Arc { /// the data_type as declared in the schema - fn data_type(&self) -> Result { - to_field(&self.schema).map(|x| x.data_type().clone()) + fn field(&self) -> Result { + to_field(&self.schema) } fn parent(&self) -> &Arc { @@ -685,8 +393,8 @@ pub struct ArrowArrayChild<'a> { impl<'a> ArrowArrayRef for ArrowArrayChild<'a> { /// the data_type as declared in the schema - fn data_type(&self) -> Result { - to_field(self.schema).map(|x| x.data_type().clone()) + fn field(&self) -> Result { + to_field(self.schema) } fn parent(&self) -> &Arc { diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index 6d3fad44ea3..19fa83a8fbf 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -3,6 +3,7 @@ mod array; #[allow(clippy::module_inception)] mod ffi; +mod schema; pub use array::try_from; pub use ffi::{create_empty, export_to_c, ArrowArray, ArrowArrayRef}; diff --git a/src/ffi/schema.rs b/src/ffi/schema.rs new file mode 100644 index 00000000000..3cdb3798ff3 --- /dev/null +++ b/src/ffi/schema.rs @@ -0,0 +1,292 @@ +use std::{ffi::CStr, ffi::CString, ptr}; + +use crate::{ + datatypes::{DataType, Field, IntervalUnit, TimeUnit}, + error::{ArrowError, Result}, +}; + +#[allow(dead_code)] +struct SchemaPrivateData { + field: Field, + children_ptr: Box<[*mut Ffi_ArrowSchema]>, +} + +/// ABI-compatible struct for `ArrowSchema` from C Data Interface +/// See +/// This was created by bindgen +#[repr(C)] +#[derive(Debug)] +pub struct Ffi_ArrowSchema { + format: *const ::std::os::raw::c_char, + name: *const ::std::os::raw::c_char, + metadata: *const ::std::os::raw::c_char, + flags: i64, + n_children: i64, + children: *mut *mut Ffi_ArrowSchema, + dictionary: *mut Ffi_ArrowSchema, + release: ::std::option::Option, + private_data: *mut ::std::os::raw::c_void, +} + +// callback used to drop [Ffi_ArrowSchema] when it is exported. +unsafe extern "C" fn c_release_schema(schema: *mut Ffi_ArrowSchema) { + if schema.is_null() { + return; + } + let schema = &mut *schema; + + // take ownership back to release it. + CString::from_raw(schema.format as *mut std::os::raw::c_char); + CString::from_raw(schema.name as *mut std::os::raw::c_char); + let private = Box::from_raw(schema.private_data as *mut SchemaPrivateData); + for child in private.children_ptr.iter() { + let _ = Box::from_raw(*child); + } + + schema.release = None; +} + +impl Ffi_ArrowSchema { + /// create a new [`Ffi_ArrowSchema`]. This fails if the fields' [`DataType`] is not supported. + pub fn try_new(field: Field) -> Result { + let format = to_format(field.data_type())?; + let name = field.name().clone(); + + // allocate (and hold) the children + let children_vec = match field.data_type() { + DataType::List(field) => { + vec![Box::new(Ffi_ArrowSchema::try_new(field.as_ref().clone())?)] + } + DataType::LargeList(field) => { + vec![Box::new(Ffi_ArrowSchema::try_new(field.as_ref().clone())?)] + } + DataType::Struct(fields) => fields + .iter() + .map(|field| Ok(Box::new(Ffi_ArrowSchema::try_new(field.clone())?))) + .collect::>>()?, + _ => vec![], + }; + // note: this cannot be done along with the above because the above is fallible and this op leaks. + let children_ptr = children_vec + .into_iter() + .map(Box::into_raw) + .collect::>(); + let n_children = children_ptr.len() as i64; + + let flags = field.is_nullable() as i64 * 2; + + let mut private = Box::new(SchemaPrivateData { + field, + children_ptr, + }); + + // + Ok(Ffi_ArrowSchema { + format: CString::new(format).unwrap().into_raw(), + name: CString::new(name).unwrap().into_raw(), + metadata: std::ptr::null_mut(), + flags, + n_children, + children: private.children_ptr.as_mut_ptr(), + dictionary: std::ptr::null_mut(), + release: Some(c_release_schema), + private_data: Box::into_raw(private) as *mut ::std::os::raw::c_void, + }) + } + + /// create an empty [Ffi_ArrowSchema] + pub fn empty() -> Self { + Self { + format: std::ptr::null_mut(), + name: std::ptr::null_mut(), + metadata: std::ptr::null_mut(), + flags: 0, + n_children: 0, + children: ptr::null_mut(), + dictionary: std::ptr::null_mut(), + release: None, + private_data: std::ptr::null_mut(), + } + } + + /// returns the format of this schema. + pub fn format(&self) -> &str { + assert!(!self.format.is_null()); + // safe because the lifetime of `self.format` equals `self` + unsafe { CStr::from_ptr(self.format) } + .to_str() + .expect("The external API has a non-utf8 as format") + } + + /// returns the name of this schema. + pub fn name(&self) -> &str { + assert!(!self.name.is_null()); + // safe because the lifetime of `self.name` equals `self` + unsafe { CStr::from_ptr(self.name) }.to_str().unwrap() + } + + pub fn child(&self, index: usize) -> &'static Self { + assert!(index < self.n_children as usize); + assert!(!self.name.is_null()); + unsafe { self.children.add(index).as_ref().unwrap().as_ref().unwrap() } + } + + pub fn nullable(&self) -> bool { + (self.flags / 2) & 1 == 1 + } +} + +impl Drop for Ffi_ArrowSchema { + fn drop(&mut self) { + match self.release { + None => (), + Some(release) => unsafe { release(self) }, + }; + } +} + +pub fn to_field(schema: &Ffi_ArrowSchema) -> Result { + let data_type = match schema.format() { + "n" => DataType::Null, + "b" => DataType::Boolean, + "c" => DataType::Int8, + "C" => DataType::UInt8, + "s" => DataType::Int16, + "S" => DataType::UInt16, + "i" => DataType::Int32, + "I" => DataType::UInt32, + "l" => DataType::Int64, + "L" => DataType::UInt64, + "e" => DataType::Float16, + "f" => DataType::Float32, + "g" => DataType::Float64, + "z" => DataType::Binary, + "Z" => DataType::LargeBinary, + "u" => DataType::Utf8, + "U" => DataType::LargeUtf8, + "tdD" => DataType::Date32, + "tdm" => DataType::Date64, + "tts" => DataType::Time32(TimeUnit::Second), + "ttm" => DataType::Time32(TimeUnit::Millisecond), + "ttu" => DataType::Time64(TimeUnit::Microsecond), + "ttn" => DataType::Time64(TimeUnit::Nanosecond), + "tDs" => DataType::Duration(TimeUnit::Second), + "tDm" => DataType::Duration(TimeUnit::Millisecond), + "tDu" => DataType::Duration(TimeUnit::Microsecond), + "tDn" => DataType::Duration(TimeUnit::Nanosecond), + "tiM" => DataType::Interval(IntervalUnit::YearMonth), + "tiD" => DataType::Interval(IntervalUnit::DayTime), + "+l" => { + let child = schema.child(0); + DataType::List(Box::new(to_field(child)?)) + } + "+L" => { + let child = schema.child(0); + DataType::LargeList(Box::new(to_field(child)?)) + } + "+s" => { + let children = (0..schema.n_children as usize) + .map(|x| to_field(schema.child(x))) + .collect::>>()?; + DataType::Struct(children) + } + other => { + let parts = other.split(':').collect::>(); + if parts.len() == 2 && parts[0] == "tss" { + DataType::Timestamp(TimeUnit::Second, Some(parts[1].to_string())) + } else if parts.len() == 2 && parts[0] == "tsm" { + DataType::Timestamp(TimeUnit::Millisecond, Some(parts[1].to_string())) + } else if parts.len() == 2 && parts[0] == "tsu" { + DataType::Timestamp(TimeUnit::Microsecond, Some(parts[1].to_string())) + } else if parts.len() == 2 && parts[0] == "tsn" { + DataType::Timestamp(TimeUnit::Nanosecond, Some(parts[1].to_string())) + } else if parts.len() == 2 && parts[0] == "d" { + let parts = parts[1].split(',').collect::>(); + if parts.len() < 2 || parts.len() > 3 { + return Err(ArrowError::Ffi( + "Decimal must contain 2 or 3 comma-separated values".to_string(), + )); + }; + if parts.len() == 3 { + let bit_width = parts[0].parse::().map_err(|_| { + ArrowError::Ffi("Decimal bit width is not a valid integer".to_string()) + })?; + if bit_width != 128 { + return Err(ArrowError::Ffi("Decimal256 is not supported".to_string())); + } + } + let precision = parts[0].parse::().map_err(|_| { + ArrowError::Ffi("Decimal precision is not a valid integer".to_string()) + })?; + let scale = parts[1].parse::().map_err(|_| { + ArrowError::Ffi("Decimal scale is not a valid integer".to_string()) + })?; + DataType::Decimal(precision, scale) + } else { + return Err(ArrowError::Ffi(format!( + "The datatype \"{}\" is still not supported in Rust implementation", + other + ))); + } + } + }; + Ok(Field::new(schema.name(), data_type, schema.nullable())) +} + +/// the inverse of [to_field] +fn to_format(data_type: &DataType) -> Result { + Ok(match data_type { + DataType::Null => "n", + DataType::Boolean => "b", + DataType::Int8 => "c", + DataType::UInt8 => "C", + DataType::Int16 => "s", + DataType::UInt16 => "S", + DataType::Int32 => "i", + DataType::UInt32 => "I", + DataType::Int64 => "l", + DataType::UInt64 => "L", + DataType::Float16 => "e", + DataType::Float32 => "f", + DataType::Float64 => "g", + DataType::Binary => "z", + DataType::LargeBinary => "Z", + DataType::Utf8 => "u", + DataType::LargeUtf8 => "U", + DataType::Date32 => "tdD", + DataType::Date64 => "tdm", + DataType::Time32(TimeUnit::Second) => "tts", + DataType::Time32(TimeUnit::Millisecond) => "ttm", + DataType::Time64(TimeUnit::Microsecond) => "ttu", + DataType::Time64(TimeUnit::Nanosecond) => "ttn", + DataType::Duration(TimeUnit::Second) => "tDs", + DataType::Duration(TimeUnit::Millisecond) => "tDm", + DataType::Duration(TimeUnit::Microsecond) => "tDu", + DataType::Duration(TimeUnit::Nanosecond) => "tDn", + DataType::Interval(IntervalUnit::YearMonth) => "tiM", + DataType::Interval(IntervalUnit::DayTime) => "tiD", + DataType::Timestamp(unit, tz) => { + let unit = match unit { + TimeUnit::Second => "s", + TimeUnit::Millisecond => "m", + TimeUnit::Microsecond => "u", + TimeUnit::Nanosecond => "n", + }; + return Ok(format!( + "ts{}:{}", + unit, + tz.as_ref().map(|x| x.as_ref()).unwrap_or("") + )); + } + DataType::Decimal(precision, scale) => return Ok(format!("d:{},{}", precision, scale)), + DataType::List(_) => "+l", + DataType::LargeList(_) => "+L", + DataType::Struct(_) => "+s", + DataType::FixedSizeBinary(size) => return Ok(format!("w{}", size)), + DataType::FixedSizeList(_, size) => return Ok(format!("+w:{}", size)), + DataType::Union(_) => todo!(), + DataType::Dictionary(index, _) => return to_format(index.as_ref()), + _ => todo!(), + } + .to_string()) +} From 522c7c7d349b8eaf71683a46409a7f0d9d6c377c Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Tue, 10 Aug 2021 18:05:07 +0000 Subject: [PATCH 2/3] Added FFI consume of dict. --- src/array/dictionary/ffi.rs | 39 +++++++++++++++++++++++++++++++++++++ src/array/dictionary/mod.rs | 14 ++----------- src/ffi/array.rs | 11 ++++++++++- src/ffi/ffi.rs | 20 ++++++++++++++++++- src/ffi/schema.rs | 24 ++++++++++++++++++++--- 5 files changed, 91 insertions(+), 17 deletions(-) create mode 100644 src/array/dictionary/ffi.rs diff --git a/src/array/dictionary/ffi.rs b/src/array/dictionary/ffi.rs new file mode 100644 index 00000000000..8fe0e49917b --- /dev/null +++ b/src/array/dictionary/ffi.rs @@ -0,0 +1,39 @@ +use crate::{ + array::{FromFfi, PrimitiveArray, ToFfi}, + error::Result, + ffi, +}; + +use super::{DictionaryArray, DictionaryKey}; + +unsafe impl ToFfi for DictionaryArray { + fn buffers(&self) -> Vec>> { + self.keys.buffers() + } + + #[inline] + fn offset(&self) -> usize { + self.offset + } +} + +unsafe impl FromFfi for DictionaryArray { + fn try_from_ffi(array: A) -> Result { + // keys: similar to PrimitiveArray, but the datatype is the inner one + let length = array.array().len(); + let offset = array.array().offset(); + let mut validity = unsafe { array.validity() }?; + let mut values = unsafe { array.buffer::(0) }?; + + if offset > 0 { + values = values.slice(offset, length); + validity = validity.map(|x| x.slice(offset, length)) + } + let keys = PrimitiveArray::::from_data(K::DATA_TYPE, values, validity); + // values + let values = array.dictionary()?.unwrap(); + let values = ffi::try_from(values)?.into(); + + Ok(DictionaryArray::::from_data(keys, values)) + } +} diff --git a/src/array/dictionary/mod.rs b/src/array/dictionary/mod.rs index 21bc374a20f..d63d82bccb2 100644 --- a/src/array/dictionary/mod.rs +++ b/src/array/dictionary/mod.rs @@ -6,12 +6,13 @@ use crate::{ types::{NativeType, NaturalDataType}, }; +mod ffi; mod iterator; mod mutable; pub use iterator::*; pub use mutable::*; -use super::{ffi::ToFfi, new_empty_array, primitive::PrimitiveArray, Array}; +use super::{new_empty_array, primitive::PrimitiveArray, Array}; /// Trait denoting [`NativeType`]s that can be used as keys of a dictionary. pub trait DictionaryKey: NativeType + NaturalDataType + num::NumCast + num::FromPrimitive {} @@ -143,14 +144,3 @@ where write!(f, "}}") } } - -unsafe impl ToFfi for DictionaryArray { - fn buffers(&self) -> Vec>> { - vec![self.keys.validity().as_ref().map(|x| x.as_ptr())] - } - - #[inline] - fn offset(&self) -> usize { - self.offset - } -} diff --git a/src/ffi/array.rs b/src/ffi/array.rs index 6c6ead2b4b2..cd1e1852f7f 100644 --- a/src/ffi/array.rs +++ b/src/ffi/array.rs @@ -22,7 +22,7 @@ use crate::array::{BooleanArray, FromFfi}; use crate::error::{ArrowError, Result}; use crate::types::days_ms; use crate::{ - array::{Array, BinaryArray, ListArray, PrimitiveArray, StructArray, Utf8Array}, + array::*, datatypes::{DataType, IntervalUnit}, }; @@ -66,6 +66,15 @@ pub fn try_from(array: A) -> Result> { DataType::List(_) => Box::new(ListArray::::try_from_ffi(array)?), DataType::LargeList(_) => Box::new(ListArray::::try_from_ffi(array)?), DataType::Struct(_) => Box::new(StructArray::try_from_ffi(array)?), + DataType::Dictionary(keys, _) => match keys.as_ref() { + DataType::Int64 => Box::new(DictionaryArray::::try_from_ffi(array)?), + other => { + return Err(ArrowError::NotYetImplemented(format!( + "Reading dictionary of keys \"{}\" is not yet supported.", + other + ))) + } + }, data_type => { return Err(ArrowError::NotYetImplemented(format!( "Reading DataType \"{}\" is not yet supported.", diff --git a/src/ffi/ffi.rs b/src/ffi/ffi.rs index 97922e626e8..fca89e4ae69 100644 --- a/src/ffi/ffi.rs +++ b/src/ffi/ffi.rs @@ -280,7 +280,6 @@ fn create_child( ) -> Result> { assert!(index < array.n_children as usize); assert!(!array.children.is_null()); - assert!(!array.children.is_null()); unsafe { let arr_ptr = *array.children.add(index); let schema_ptr = schema.child(index); @@ -290,6 +289,21 @@ fn create_child( } } +fn create_dictionary( + array: &Ffi_ArrowArray, + schema: &Ffi_ArrowSchema, + parent: Arc, +) -> Result>> { + let schema = schema.dictionary(); + if let Some(schema) = schema { + assert!(!array.dictionary.is_null()); + let array = unsafe { &*array.dictionary }; + Ok(Some(ArrowArrayChild::from_raw(array, schema, parent))) + } else { + Ok(None) + } +} + pub trait ArrowArrayRef { fn deallocation(&self) -> Deallocation { Deallocation::Foreign(self.parent().clone()) @@ -334,6 +348,10 @@ pub trait ArrowArrayRef { create_child(self.array(), self.schema(), self.parent().clone(), index) } + fn dictionary(&self) -> Result> { + create_dictionary(self.array(), self.schema(), self.parent().clone()) + } + fn parent(&self) -> &Arc; fn array(&self) -> &Ffi_ArrowArray; fn schema(&self) -> &Ffi_ArrowSchema; diff --git a/src/ffi/schema.rs b/src/ffi/schema.rs index 3cdb3798ff3..3261432226e 100644 --- a/src/ffi/schema.rs +++ b/src/ffi/schema.rs @@ -131,6 +131,13 @@ impl Ffi_ArrowSchema { unsafe { self.children.add(index).as_ref().unwrap().as_ref().unwrap() } } + pub fn dictionary(&self) -> Option<&'static Self> { + if self.dictionary.is_null() { + return None; + }; + Some(unsafe { self.dictionary.as_ref().unwrap() }) + } + pub fn nullable(&self) -> bool { (self.flags / 2) & 1 == 1 } @@ -146,7 +153,19 @@ impl Drop for Ffi_ArrowSchema { } pub fn to_field(schema: &Ffi_ArrowSchema) -> Result { - let data_type = match schema.format() { + let dictionary = schema.dictionary(); + let data_type = if let Some(dictionary) = dictionary { + let indices_data_type = to_data_type(schema)?; + let values_data_type = to_data_type(dictionary)?; + DataType::Dictionary(Box::new(indices_data_type), Box::new(values_data_type)) + } else { + to_data_type(schema)? + }; + Ok(Field::new(schema.name(), data_type, schema.nullable())) +} + +fn to_data_type(schema: &Ffi_ArrowSchema) -> Result { + Ok(match schema.format() { "n" => DataType::Null, "b" => DataType::Boolean, "c" => DataType::Int8, @@ -229,8 +248,7 @@ pub fn to_field(schema: &Ffi_ArrowSchema) -> Result { ))); } } - }; - Ok(Field::new(schema.name(), data_type, schema.nullable())) + }) } /// the inverse of [to_field] From f774364d0b4b8d8547f0180b8d601ce9d712e307 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Tue, 10 Aug 2021 20:28:03 +0000 Subject: [PATCH 3/3] Added support for FFI of dictionary-encoded arrays. --- .../tests/test_sql.py | 16 +++++++- src/array/ffi.rs | 37 +++++++++++++------ src/array/mod.rs | 2 +- src/ffi/array.rs | 17 ++++----- src/ffi/ffi.rs | 15 ++++++-- src/ffi/schema.rs | 16 +++++++- 6 files changed, 77 insertions(+), 26 deletions(-) diff --git a/arrow-pyarrow-integration-testing/tests/test_sql.py b/arrow-pyarrow-integration-testing/tests/test_sql.py index e6ec3223d0e..5260536342a 100644 --- a/arrow-pyarrow-integration-testing/tests/test_sql.py +++ b/arrow-pyarrow-integration-testing/tests/test_sql.py @@ -104,7 +104,7 @@ def test_decimal_roundtrip(self): data = [ round(decimal.Decimal(722.82), 2), round(decimal.Decimal(-934.11), 2), - None + None, ] a = pyarrow.array(data, pyarrow.decimal128(5, 2)) b = arrow_pyarrow_integration_testing.round_trip(a) @@ -179,3 +179,17 @@ def test_list_list_array(self): b.validate(full=True) assert a.to_pylist() == b.to_pylist() assert a.type == b.type + + def test_dict(self): + """ + Python -> Rust -> Python + """ + a = pyarrow.array( + ["a", "a", "b", None, "c"], + pyarrow.dictionary(pyarrow.int64(), pyarrow.utf8()), + ) + b = arrow_pyarrow_integration_testing.round_trip(a) + + b.validate(full=True) + assert a.to_pylist() == b.to_pylist() + assert a.type == b.type diff --git a/src/array/ffi.rs b/src/array/ffi.rs index 123d925d36b..e707f98d6b0 100644 --- a/src/array/ffi.rs +++ b/src/array/ffi.rs @@ -29,13 +29,28 @@ pub unsafe trait FromFfi: Sized { macro_rules! ffi_dyn { ($array:expr, $ty:ty) => {{ let array = $array.as_any().downcast_ref::<$ty>().unwrap(); - (array.buffers(), array.children()) + (array.buffers(), array.children(), None) }}; } -type BuffersChildren = (Vec>>, Vec>); +macro_rules! ffi_dict_dyn { + ($array:expr, $ty:ty) => {{ + let array = $array.as_any().downcast_ref::<$ty>().unwrap(); + ( + array.buffers(), + array.children(), + Some(array.values().clone()), + ) + }}; +} + +type BuffersChildren = ( + Vec>>, + Vec>, + Option>, +); -pub fn buffers_children(array: &dyn Array) -> BuffersChildren { +pub fn buffers_children_dictionary(array: &dyn Array) -> BuffersChildren { match array.data_type() { DataType::Null => ffi_dyn!(array, NullArray), DataType::Boolean => ffi_dyn!(array, BooleanArray), @@ -72,14 +87,14 @@ pub fn buffers_children(array: &dyn Array) -> BuffersChildren { DataType::Struct(_) => ffi_dyn!(array, StructArray), DataType::Union(_) => unimplemented!(), DataType::Dictionary(key_type, _) => match key_type.as_ref() { - DataType::Int8 => ffi_dyn!(array, DictionaryArray::), - DataType::Int16 => ffi_dyn!(array, DictionaryArray::), - DataType::Int32 => ffi_dyn!(array, DictionaryArray::), - DataType::Int64 => ffi_dyn!(array, DictionaryArray::), - DataType::UInt8 => ffi_dyn!(array, DictionaryArray::), - DataType::UInt16 => ffi_dyn!(array, DictionaryArray::), - DataType::UInt32 => ffi_dyn!(array, DictionaryArray::), - DataType::UInt64 => ffi_dyn!(array, DictionaryArray::), + DataType::Int8 => ffi_dict_dyn!(array, DictionaryArray::), + DataType::Int16 => ffi_dict_dyn!(array, DictionaryArray::), + DataType::Int32 => ffi_dict_dyn!(array, DictionaryArray::), + DataType::Int64 => ffi_dict_dyn!(array, DictionaryArray::), + DataType::UInt8 => ffi_dict_dyn!(array, DictionaryArray::), + DataType::UInt16 => ffi_dict_dyn!(array, DictionaryArray::), + DataType::UInt32 => ffi_dict_dyn!(array, DictionaryArray::), + DataType::UInt64 => ffi_dict_dyn!(array, DictionaryArray::), _ => unreachable!(), }, } diff --git a/src/array/mod.rs b/src/array/mod.rs index 772c49b46c8..3505a6204a6 100644 --- a/src/array/mod.rs +++ b/src/array/mod.rs @@ -401,7 +401,7 @@ pub use specification::Offset; pub use struct_::StructArray; pub use utf8::{MutableUtf8Array, Utf8Array, Utf8ValuesIter}; -pub(crate) use self::ffi::buffers_children; +pub(crate) use self::ffi::buffers_children_dictionary; pub use self::ffi::FromFfi; pub use self::ffi::ToFfi; diff --git a/src/ffi/array.rs b/src/ffi/array.rs index cd1e1852f7f..88575b60f50 100644 --- a/src/ffi/array.rs +++ b/src/ffi/array.rs @@ -67,13 +67,15 @@ pub fn try_from(array: A) -> Result> { DataType::LargeList(_) => Box::new(ListArray::::try_from_ffi(array)?), DataType::Struct(_) => Box::new(StructArray::try_from_ffi(array)?), DataType::Dictionary(keys, _) => match keys.as_ref() { + DataType::Int8 => Box::new(DictionaryArray::::try_from_ffi(array)?), + DataType::Int16 => Box::new(DictionaryArray::::try_from_ffi(array)?), + DataType::Int32 => Box::new(DictionaryArray::::try_from_ffi(array)?), DataType::Int64 => Box::new(DictionaryArray::::try_from_ffi(array)?), - other => { - return Err(ArrowError::NotYetImplemented(format!( - "Reading dictionary of keys \"{}\" is not yet supported.", - other - ))) - } + DataType::UInt8 => Box::new(DictionaryArray::::try_from_ffi(array)?), + DataType::UInt16 => Box::new(DictionaryArray::::try_from_ffi(array)?), + DataType::UInt32 => Box::new(DictionaryArray::::try_from_ffi(array)?), + DataType::UInt64 => Box::new(DictionaryArray::::try_from_ffi(array)?), + _ => unreachable!(), }, data_type => { return Err(ArrowError::NotYetImplemented(format!( @@ -89,7 +91,6 @@ pub fn try_from(array: A) -> Result> { #[cfg(test)] mod tests { use super::*; - use crate::array::*; use crate::datatypes::TimeUnit; use crate::{error::Result, ffi}; use std::sync::Arc; @@ -209,7 +210,6 @@ mod tests { test_round_trip(array) } - /* #[test] fn test_dict() -> Result<()> { let data = vec![Some("a"), Some("a"), None, Some("b")]; @@ -221,5 +221,4 @@ mod tests { test_round_trip(array) } - */ } diff --git a/src/ffi/ffi.rs b/src/ffi/ffi.rs index fca89e4ae69..756b0b04da8 100644 --- a/src/ffi/ffi.rs +++ b/src/ffi/ffi.rs @@ -19,7 +19,7 @@ use std::{ptr::NonNull, sync::Arc}; use super::schema::{to_field, Ffi_ArrowSchema}; use crate::{ - array::{buffers_children, Array}, + array::{buffers_children_dictionary, Array}, bitmap::{utils::bytes_for, Bitmap}, buffer::{ bytes::{Bytes, Deallocation}, @@ -75,6 +75,10 @@ unsafe extern "C" fn c_release_array(array: *mut Ffi_ArrowArray) { let _ = Box::from_raw(*child); } + if let Some(ptr) = private.dictionary_ptr { + let _ = Box::from_raw(ptr); + } + array.release = None; } @@ -83,6 +87,7 @@ struct PrivateData { array: Arc, buffers_ptr: Box<[*const std::os::raw::c_void]>, children_ptr: Box<[*mut Ffi_ArrowArray]>, + dictionary_ptr: Option<*mut Ffi_ArrowArray>, } impl Ffi_ArrowArray { @@ -91,7 +96,7 @@ impl Ffi_ArrowArray { /// This method releases `buffers`. Consumers of this struct *must* call `release` before /// releasing this struct, or contents in `buffers` leak. fn new(array: Arc) -> Self { - let (buffers, children) = buffers_children(array.as_ref()); + let (buffers, children, dictionary) = buffers_children_dictionary(array.as_ref()); let buffers_ptr = buffers .iter() @@ -109,6 +114,9 @@ impl Ffi_ArrowArray { .collect::>(); let n_children = children_ptr.len() as i64; + let dictionary_ptr = + dictionary.map(|array| Box::into_raw(Box::new(Ffi_ArrowArray::new(array)))); + let length = array.len() as i64; let null_count = array.null_count() as i64; @@ -116,6 +124,7 @@ impl Ffi_ArrowArray { array, buffers_ptr, children_ptr, + dictionary_ptr, }); Self { @@ -126,7 +135,7 @@ impl Ffi_ArrowArray { n_children, buffers: private_data.buffers_ptr.as_mut_ptr(), children: private_data.children_ptr.as_mut_ptr(), - dictionary: std::ptr::null_mut(), + dictionary: private_data.dictionary_ptr.unwrap_or(std::ptr::null_mut()), release: Some(c_release_array), private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void, } diff --git a/src/ffi/schema.rs b/src/ffi/schema.rs index 3261432226e..b9edf4d544d 100644 --- a/src/ffi/schema.rs +++ b/src/ffi/schema.rs @@ -9,6 +9,7 @@ use crate::{ struct SchemaPrivateData { field: Field, children_ptr: Box<[*mut Ffi_ArrowSchema]>, + dictionary: Option<*mut Ffi_ArrowSchema>, } /// ABI-compatible struct for `ArrowSchema` from C Data Interface @@ -43,6 +44,10 @@ unsafe extern "C" fn c_release_schema(schema: *mut Ffi_ArrowSchema) { let _ = Box::from_raw(*child); } + if let Some(ptr) = private.dictionary { + let _ = Box::from_raw(ptr); + } + schema.release = None; } @@ -75,9 +80,18 @@ impl Ffi_ArrowSchema { let flags = field.is_nullable() as i64 * 2; + let dictionary = if let DataType::Dictionary(_, values) = field.data_type() { + // we do not store field info in the dict values, so can't recover it all :( + let field = Field::new("item", values.as_ref().clone(), true); + Some(Box::new(Ffi_ArrowSchema::try_new(field)?)) + } else { + None + }; + let mut private = Box::new(SchemaPrivateData { field, children_ptr, + dictionary: dictionary.map(Box::into_raw), }); // @@ -88,7 +102,7 @@ impl Ffi_ArrowSchema { flags, n_children, children: private.children_ptr.as_mut_ptr(), - dictionary: std::ptr::null_mut(), + dictionary: private.dictionary.unwrap_or(std::ptr::null_mut()), release: Some(c_release_schema), private_data: Box::into_raw(private) as *mut ::std::os::raw::c_void, })