From b0de02de6e292f58631251cdfeb9cab1bd295d2a Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Fri, 20 Dec 2024 15:43:38 +0000 Subject: [PATCH] added a `into_arrow_with_data_type` method which can be used to convert to a target arrow data_type --- bench-vortex/benches/compress.rs | 2 +- vortex-array/src/array/varbin/canonical.rs | 26 ++- vortex-array/src/arrow/datum.rs | 8 +- vortex-array/src/arrow/record_batch.rs | 20 ++- vortex-array/src/canonical.rs | 193 +++++++++++++++------ vortex-datafusion/src/memory/stream.rs | 2 +- 6 files changed, 176 insertions(+), 75 deletions(-) diff --git a/bench-vortex/benches/compress.rs b/bench-vortex/benches/compress.rs index 91bd8c0f2d..0b46c41174 100644 --- a/bench-vortex/benches/compress.rs +++ b/bench-vortex/benches/compress.rs @@ -34,7 +34,7 @@ use vortex::error::VortexResult; use vortex::file::{LayoutContext, LayoutDeserializer, VortexFileWriter, VortexReadBuilder}; use vortex::sampling_compressor::compressors::fsst::FSSTCompressor; use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT}; -use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoCanonical}; +use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant}; use crate::tokio_runtime::TOKIO_RUNTIME; diff --git a/vortex-array/src/array/varbin/canonical.rs b/vortex-array/src/array/varbin/canonical.rs index b8245fb11f..4aca9b9ed2 100644 --- a/vortex-array/src/array/varbin/canonical.rs +++ b/vortex-array/src/array/varbin/canonical.rs @@ -1,7 +1,7 @@ -use arrow_array::ArrayRef; +use arrow_array::{Array, ArrayRef}; use arrow_schema::DataType; use vortex_dtype::DType; -use vortex_error::VortexResult; +use vortex_error::{vortex_bail, VortexResult}; use crate::array::varbin::arrow::varbin_to_arrow; use crate::array::varbin::VarBinArray; @@ -23,10 +23,28 @@ impl IntoCanonical for VarBinArray { VarBinViewArray::try_from(ArrayData::from_arrow(array, nullable)).map(Canonical::VarBinView) } - fn into_arrow(self) -> VortexResult { + fn into_arrow(self) -> VortexResult + where + Self: Sized, + { + varbin_to_arrow(&self) + } + + fn into_arrow_with_data_type(self, data_type: &DataType) -> VortexResult { // Specialized implementation of `into_arrow` for VarBin since it has a direct // Arrow representation. - varbin_to_arrow(&self) + let array_ref = varbin_to_arrow(&self)?; + + // Note, arrow::cast clones the array, so don't use it if unnecessary. + Ok(match data_type { + DataType::Binary | DataType::LargeBinary | DataType::Utf8 | DataType::LargeUtf8 => { + array_ref + } + DataType::Utf8View | DataType::BinaryView => { + arrow_cast::cast(array_ref.as_ref(), data_type)? + } + _ => vortex_bail!("Unsupported data type: {:?}", data_type), + }) } } diff --git a/vortex-array/src/arrow/datum.rs b/vortex-array/src/arrow/datum.rs index b0d0ba08a6..8aa11dbd3d 100644 --- a/vortex-array/src/arrow/datum.rs +++ b/vortex-array/src/arrow/datum.rs @@ -2,10 +2,10 @@ use arrow_array::{Array, ArrayRef, Datum as ArrowDatum}; use vortex_error::VortexError; use crate::compute::slice; -use crate::stats::{ArrayStatistics, Stat}; use crate::{ArrayData, IntoCanonical}; /// A wrapper around a generic Arrow array that can be used as a Datum in Arrow compute. +#[derive(Debug)] pub struct Datum { array: ArrayRef, is_scalar: bool, @@ -15,11 +15,7 @@ impl TryFrom for Datum { type Error = VortexError; fn try_from(array: ArrayData) -> Result { - if array - .statistics() - .get_as::(Stat::IsConstant) - .unwrap_or_default() - { + if array.is_constant() { Ok(Self { array: slice(array, 0, 1)?.into_arrow()?, is_scalar: true, diff --git a/vortex-array/src/arrow/record_batch.rs b/vortex-array/src/arrow/record_batch.rs index 25332b127f..61d22929c1 100644 --- a/vortex-array/src/arrow/record_batch.rs +++ b/vortex-array/src/arrow/record_batch.rs @@ -1,5 +1,6 @@ -use arrow_array::cast::as_struct_array; +use arrow_array::cast::AsArray; use arrow_array::RecordBatch; +use arrow_schema::{DataType, Schema}; use itertools::Itertools; use vortex_error::{vortex_err, VortexError, VortexResult}; @@ -41,16 +42,19 @@ impl TryFrom for RecordBatch { vortex_err!("RecordBatch can only be constructed from a Vortex StructArray: {err}") })?; - RecordBatch::try_from(struct_arr) + struct_arr.into_record_batch() } } -impl TryFrom for RecordBatch { - type Error = VortexError; +impl StructArray { + pub fn into_record_batch(self) -> VortexResult { + let array_ref = self.into_array().into_arrow()?; + Ok(RecordBatch::try_from(array_ref.as_struct())?) + } - fn try_from(value: StructArray) -> VortexResult { - let array_ref = value.into_canonical()?.into_arrow()?; - let struct_array = as_struct_array(array_ref.as_ref()); - Ok(Self::from(struct_array)) + pub fn into_record_batch_with_schema(self, schema: &Schema) -> VortexResult { + let data_type = DataType::Struct(schema.fields.clone()); + let array_ref = self.into_array().into_arrow_with_data_type(&data_type)?; + Ok(RecordBatch::try_from(array_ref.as_struct())?) } } diff --git a/vortex-array/src/canonical.rs b/vortex-array/src/canonical.rs index 8c6fddf98c..c2986c14ac 100644 --- a/vortex-array/src/canonical.rs +++ b/vortex-array/src/canonical.rs @@ -4,14 +4,16 @@ use std::sync::Arc; use arrow_array::types::*; use arrow_array::{ - Array, ArrayRef, ArrowPrimitiveType, BooleanArray as ArrowBoolArray, Date32Array, Date64Array, - NullArray as ArrowNullArray, PrimitiveArray as ArrowPrimitiveArray, + new_null_array, Array, ArrayRef, ArrowPrimitiveType, BooleanArray as ArrowBoolArray, + Date32Array, Date64Array, PrimitiveArray as ArrowPrimitiveArray, StructArray as ArrowStructArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, }; use arrow_buffer::ScalarBuffer; -use arrow_schema::{Field, FieldRef, Fields}; +use arrow_cast::cast; +use arrow_schema::{DataType, Field, FieldRef, Fields}; +use itertools::Itertools; use vortex_datetime_dtype::{is_temporal_ext_type, TemporalMetadata, TimeUnit}; use vortex_dtype::{DType, NativePType, PType}; use vortex_error::{vortex_bail, VortexError, VortexResult}; @@ -27,7 +29,7 @@ use crate::encoding::Encoding; use crate::stats::ArrayStatistics; use crate::validity::ArrayValidity; use crate::variants::{PrimitiveArrayTrait, StructArrayTrait}; -use crate::{ArrayDType, ArrayData, ArrayLen, IntoArrayData, ToArrayData}; +use crate::{ArrayDType, ArrayData, ArrayLen, IntoArrayData}; /// The set of canonical array encodings, also the set of encodings that can be transferred to /// Arrow with zero-copy. @@ -71,24 +73,29 @@ impl Canonical { /// zero copies, while more complex variants such as Struct may require allocations if its child /// arrays require decompression. pub fn into_arrow(self) -> VortexResult { - Ok(match self { - Canonical::Null(a) => null_to_arrow(a)?, - Canonical::Bool(a) => bool_to_arrow(a)?, - Canonical::Primitive(a) => primitive_to_arrow(a)?, - Canonical::Struct(a) => struct_to_arrow(a)?, - Canonical::List(a) => list_to_arrow(a)?, - Canonical::VarBinView(a) => varbinview_as_arrow(&a), + let data_type = infer_data_type(self.dtype())?; + self.into_arrow_with_data_type(&data_type) + } + + pub fn into_arrow_with_data_type(self, data_type: &DataType) -> VortexResult { + match self { + Canonical::Null(a) => null_to_arrow(a, data_type), + Canonical::Bool(a) => bool_to_arrow(a, data_type), + Canonical::Primitive(a) => primitive_to_arrow(a, data_type), + Canonical::Struct(a) => struct_to_arrow(a, data_type), + Canonical::List(a) => list_to_arrow(a, data_type), + Canonical::VarBinView(a) => varbinview_to_arrow(&a, data_type), Canonical::Extension(a) => { if is_temporal_ext_type(a.id()) { - temporal_to_arrow(TemporalArray::try_from(a.into_array())?)? + temporal_to_arrow(TemporalArray::try_from(a.into_array())?) } else { // Convert storage array directly into arrow, losing type information // that will let us round-trip. // TODO(aduffy): https://github.com/spiraldb/vortex/issues/1167 - a.storage().into_arrow()? + a.storage().into_arrow_with_data_type(data_type) } } - }) + } } } @@ -156,21 +163,27 @@ impl Canonical { } } -fn null_to_arrow(null_array: NullArray) -> VortexResult { - Ok(Arc::new(ArrowNullArray::new(null_array.len()))) +fn null_to_arrow(null_array: NullArray, data_type: &DataType) -> VortexResult { + Ok(new_null_array(data_type, null_array.len())) } -fn bool_to_arrow(bool_array: BoolArray) -> VortexResult { +fn bool_to_arrow(bool_array: BoolArray, data_type: &DataType) -> VortexResult { + debug_assert_eq!(data_type, &DataType::Boolean); Ok(Arc::new(ArrowBoolArray::new( bool_array.boolean_buffer(), bool_array.logical_validity().to_null_buffer()?, ))) } -fn primitive_to_arrow(primitive_array: PrimitiveArray) -> VortexResult { +fn primitive_to_arrow( + primitive_array: PrimitiveArray, + data_type: &DataType, +) -> VortexResult { fn as_arrow_array_primitive( array: &PrimitiveArray, + data_type: &DataType, ) -> VortexResult>> { + debug_assert_eq!(data_type, &T::DATA_TYPE); Ok(Arc::new(ArrowPrimitiveArray::new( ScalarBuffer::::new(array.buffer().clone().into_arrow(), 0, array.len()), array.logical_validity().to_null_buffer()?, @@ -178,28 +191,37 @@ fn primitive_to_arrow(primitive_array: PrimitiveArray) -> VortexResult } Ok(match primitive_array.ptype() { - PType::U8 => as_arrow_array_primitive::(&primitive_array)?, - PType::U16 => as_arrow_array_primitive::(&primitive_array)?, - PType::U32 => as_arrow_array_primitive::(&primitive_array)?, - PType::U64 => as_arrow_array_primitive::(&primitive_array)?, - PType::I8 => as_arrow_array_primitive::(&primitive_array)?, - PType::I16 => as_arrow_array_primitive::(&primitive_array)?, - PType::I32 => as_arrow_array_primitive::(&primitive_array)?, - PType::I64 => as_arrow_array_primitive::(&primitive_array)?, - PType::F16 => as_arrow_array_primitive::(&primitive_array)?, - PType::F32 => as_arrow_array_primitive::(&primitive_array)?, - PType::F64 => as_arrow_array_primitive::(&primitive_array)?, + // TODO: verfiy that data_type is ignored not in debug + PType::U8 => as_arrow_array_primitive::(&primitive_array, data_type)?, + PType::U16 => as_arrow_array_primitive::(&primitive_array, data_type)?, + PType::U32 => as_arrow_array_primitive::(&primitive_array, data_type)?, + PType::U64 => as_arrow_array_primitive::(&primitive_array, data_type)?, + PType::I8 => as_arrow_array_primitive::(&primitive_array, data_type)?, + PType::I16 => as_arrow_array_primitive::(&primitive_array, data_type)?, + PType::I32 => as_arrow_array_primitive::(&primitive_array, data_type)?, + PType::I64 => as_arrow_array_primitive::(&primitive_array, data_type)?, + PType::F16 => as_arrow_array_primitive::(&primitive_array, data_type)?, + PType::F32 => as_arrow_array_primitive::(&primitive_array, data_type)?, + PType::F64 => as_arrow_array_primitive::(&primitive_array, data_type)?, }) } -fn struct_to_arrow(struct_array: StructArray) -> VortexResult { - let field_arrays = struct_array - .names() +// TODO: think about top level struct array nullability, can you specify the nullability of the +// struct arrow array +fn struct_to_arrow(struct_array: StructArray, data_type: &DataType) -> VortexResult { + let target_fields = match data_type { + DataType::Struct(fields) => fields, + _ => vortex_bail!("Expected DataType::Struct, got {:?}", data_type), + }; + + let field_arrays = target_fields .iter() - .zip(struct_array.children()) - .map(|(name, f)| { - f.into_canonical() - .map_err(|err| err.with_context(format!("Failed to canonicalize field {}", name))) + .zip_eq(struct_array.children()) + .map(|(field, arr)| { + arr.into_canonical() + .map_err(|err| { + err.with_context(format!("Failed to canonicalize field {}", field.name())) + }) .and_then(|c| c.into_arrow()) }) .collect::>>()?; @@ -235,25 +257,39 @@ fn struct_to_arrow(struct_array: StructArray) -> VortexResult { } } +pub(crate) fn varbinview_to_arrow( + var_bin_view: &VarBinViewArray, + data_type: &DataType, +) -> VortexResult { + let arrow_arr = varbinview_as_arrow(var_bin_view); + // Note, arrow cast clones the array + Ok(match data_type { + DataType::Binary | DataType::LargeBinary | DataType::Utf8 | DataType::LargeUtf8 => { + cast(arrow_arr.as_ref(), data_type)? + } + DataType::Utf8View | DataType::BinaryView => arrow_arr, + _ => vortex_bail!("Unsupported data type: {:?}", data_type), + }) +} + // TODO(joe): unify with varbin -fn list_to_arrow(list: ListArray) -> VortexResult { +fn list_to_arrow(list: ListArray, data_type: &DataType) -> VortexResult { let offsets = list .offsets() .into_primitive() .map_err(|err| err.with_context("Failed to canonicalize offsets"))?; - let offsets = match offsets.ptype() { - PType::I32 | PType::I64 => offsets, - PType::U64 => try_cast(offsets, PType::I64.into())?.into_primitive()?, - PType::U32 => try_cast(offsets, PType::I32.into())?.into_primitive()?, - - // Unless it's u64, everything else can be converted into an i32. - _ => try_cast(offsets.to_array(), PType::I32.into()) - .and_then(|a| a.into_primitive()) - .map_err(|err| err.with_context("Failed to cast offsets to PrimitiveArray of i32"))?, + let (cast_ptype, element_dtype) = match data_type { + DataType::List(field) => (PType::I32, field.data_type()), + DataType::LargeList(field) => (PType::I64, field.data_type()), + _ => vortex_bail!("list_to_arrow: unsupported data type: {:?}", data_type), }; - let values = list.elements().into_arrow()?; + let offsets = try_cast(offsets, cast_ptype.into()) + .map_err(|err| err.with_context("Failed to cast offsets to PrimitiveArray"))? + .into_primitive()?; + + let values = list.elements().into_arrow_with_data_type(element_dtype)?; let field_ref = FieldRef::new(Field::new_list_field( values.data_type().clone(), @@ -372,6 +408,13 @@ pub trait IntoCanonical { { self.into_canonical()?.into_arrow() } + + fn into_arrow_with_data_type(self, data_type: &DataType) -> VortexResult + where + Self: Sized, + { + self.into_canonical()?.into_arrow_with_data_type(data_type) + } } /// Encoding VTable for canonicalizing an array. @@ -380,6 +423,19 @@ pub trait IntoCanonicalVTable { fn into_canonical(&self, array: ArrayData) -> VortexResult; fn into_arrow(&self, array: ArrayData) -> VortexResult; + + fn into_arrow_with_data_type( + &self, + array: ArrayData, + data_type: &DataType, + ) -> VortexResult { + let arrow_array = self.into_arrow(array)?; + if arrow_array.data_type() != data_type { + Ok(cast(&arrow_array, data_type)?) + } else { + Ok(arrow_array) + } + } } /// Implement the [IntoCanonicalVTable] for all encodings with arrays implementing [IntoCanonical]. @@ -399,6 +455,14 @@ where fn into_arrow(&self, array: ArrayData) -> VortexResult { E::Array::try_from(array)?.into_arrow() } + + fn into_arrow_with_data_type( + &self, + array: ArrayData, + data_type: &DataType, + ) -> VortexResult { + E::Array::try_from(array)?.into_arrow_with_data_type(data_type) + } } /// Trait for types that can be converted from an owned type into an owned array variant. @@ -468,9 +532,16 @@ impl IntoCanonical for ArrayData { self.encoding().into_canonical(self) } - fn into_arrow(self) -> VortexResult { + fn into_arrow(self) -> VortexResult + where + Self: Sized, + { self.encoding().into_arrow(self) } + + fn into_arrow_with_data_type(self, data_type: &DataType) -> VortexResult { + self.encoding().into_arrow_with_data_type(self, data_type) + } } /// This conversion is always "free" and should not touch underlying data. All it does is create an @@ -527,16 +598,17 @@ mod test { use arrow_array::cast::AsArray; use arrow_array::types::{Int32Type, Int64Type, UInt64Type}; use arrow_array::{ - ListArray as ArrowListArray, PrimitiveArray as ArrowPrimitiveArray, StringArray, - StringViewArray, StructArray as ArrowStructArray, + Array, ArrayRef, ListArray as ArrowListArray, PrimitiveArray as ArrowPrimitiveArray, + StringArray, StringViewArray, StructArray as ArrowStructArray, }; use arrow_buffer::{NullBufferBuilder, OffsetBuffer}; + use arrow_cast::cast; use arrow_schema::{DataType, Field}; use crate::array::{PrimitiveArray, SparseArray, StructArray}; - use crate::arrow::FromArrowArray; + use crate::arrow::{infer_data_type, FromArrowArray}; use crate::validity::Validity; - use crate::{ArrayData, IntoArrayData, IntoCanonical}; + use crate::{ArrayDType, ArrayData, IntoArrayData, IntoCanonical}; #[test] fn test_canonicalize_nested_struct() { @@ -569,8 +641,9 @@ mod test { ]) .unwrap(); + let data_type = infer_data_type(nested_struct_array.dtype()).unwrap(); let arrow_struct = nested_struct_array - .into_arrow() + .into_arrow_with_data_type(&data_type) .unwrap() .as_any() .downcast_ref::() @@ -637,10 +710,14 @@ mod test { ); let vortex_struct = ArrayData::from_arrow(&arrow_struct, true); + let data_type = infer_data_type(vortex_struct.dtype()).unwrap(); assert_eq!( &arrow_struct, - vortex_struct.into_arrow().unwrap().as_struct() + vortex_struct + .into_arrow_with_data_type(&data_type) + .unwrap() + .as_struct() ); } @@ -658,9 +735,15 @@ mod test { names, None, ); + let list_data_type = arrow_list.data_type(); let vortex_list = ArrayData::from_arrow(&arrow_list, true); - assert_eq!(&arrow_list, vortex_list.into_arrow().unwrap().as_list()); + let rt_arrow_list = cast(&vortex_list.into_arrow().unwrap(), list_data_type).unwrap(); + + assert_eq!( + (Arc::new(arrow_list.clone()) as ArrayRef).as_ref(), + rt_arrow_list.as_ref() + ); } } diff --git a/vortex-datafusion/src/memory/stream.rs b/vortex-datafusion/src/memory/stream.rs index bf69393e2c..9e2488ca6e 100644 --- a/vortex-datafusion/src/memory/stream.rs +++ b/vortex-datafusion/src/memory/stream.rs @@ -43,7 +43,7 @@ impl Stream for VortexRecordBatchStream { exec_datafusion_err!("projection pushdown to Vortex failed: {vortex_err}") })?; - Poll::Ready(Some(Ok(projected_struct.try_into()?))) + Poll::Ready(Some(Ok(projected_struct.into_record_batch()?))) } fn size_hint(&self) -> (usize, Option) {