From e642cc2a94f38518d765d25c8113523aedc29198 Mon Sep 17 00:00:00 2001 From: Junjun Dong Date: Sat, 11 Nov 2023 06:40:20 -0800 Subject: [PATCH] chore: remove panics in datafusion-common::scalar (#7901) --- datafusion/common/src/pyarrow.rs | 2 +- datafusion/common/src/scalar.rs | 598 ++++++++++-------- datafusion/core/benches/scalar.rs | 10 +- .../core/src/datasource/listing/helpers.rs | 5 +- .../physical_plan/file_scan_config.rs | 12 +- .../physical_plan/parquet/row_filter.rs | 2 +- .../physical_plan/parquet/row_groups.rs | 4 +- datafusion/expr/src/columnar_value.rs | 14 +- datafusion/expr/src/window_state.rs | 2 +- .../src/unwrap_cast_in_comparison.rs | 8 +- .../src/aggregate/correlation.rs | 12 +- .../physical-expr/src/aggregate/covariance.rs | 12 +- .../physical-expr/src/aggregate/first_last.rs | 10 +- .../physical-expr/src/aggregate/stddev.rs | 12 +- .../physical-expr/src/aggregate/utils.rs | 4 +- .../physical-expr/src/aggregate/variance.rs | 12 +- .../src/conditional_expressions.rs | 2 +- .../physical-expr/src/datetime_expressions.rs | 2 +- .../physical-expr/src/expressions/binary.rs | 49 +- .../physical-expr/src/expressions/case.rs | 64 +- .../physical-expr/src/expressions/cast.rs | 12 +- .../physical-expr/src/expressions/datum.rs | 14 +- .../src/expressions/get_indexed_field.rs | 33 +- .../physical-expr/src/expressions/in_list.rs | 11 +- .../src/expressions/is_not_null.rs | 5 +- .../physical-expr/src/expressions/is_null.rs | 5 +- .../physical-expr/src/expressions/like.rs | 5 +- .../physical-expr/src/expressions/literal.rs | 5 +- .../physical-expr/src/expressions/mod.rs | 12 +- .../physical-expr/src/expressions/negative.rs | 2 +- .../physical-expr/src/expressions/not.rs | 5 +- .../physical-expr/src/expressions/nullif.rs | 20 +- .../physical-expr/src/expressions/try_cast.rs | 12 +- datafusion/physical-expr/src/functions.rs | 63 +- .../src/intervals/interval_aritmetic.rs | 2 +- .../physical-expr/src/math_expressions.rs | 3 +- datafusion/physical-expr/src/planner.rs | 2 +- .../physical-expr/src/struct_expressions.rs | 15 +- .../window/built_in_window_function_expr.rs | 6 +- .../physical-expr/src/window/lead_lag.rs | 1 + .../physical-expr/src/window/window_expr.rs | 6 +- .../physical-plan/src/aggregates/mod.rs | 29 +- .../src/aggregates/no_grouping.rs | 6 +- datafusion/physical-plan/src/filter.rs | 2 +- .../physical-plan/src/joins/cross_join.rs | 2 +- .../physical-plan/src/joins/hash_join.rs | 8 +- .../src/joins/hash_join_utils.rs | 2 +- .../src/joins/symmetric_hash_join.rs | 6 +- datafusion/physical-plan/src/joins/utils.rs | 2 +- datafusion/physical-plan/src/projection.rs | 6 +- .../physical-plan/src/repartition/mod.rs | 4 +- datafusion/physical-plan/src/sorts/stream.rs | 4 +- datafusion/physical-plan/src/topk/mod.rs | 2 +- datafusion/physical-plan/src/unnest.rs | 2 +- 54 files changed, 723 insertions(+), 427 deletions(-) diff --git a/datafusion/common/src/pyarrow.rs b/datafusion/common/src/pyarrow.rs index 59a8b811e3c8..aa0153919360 100644 --- a/datafusion/common/src/pyarrow.rs +++ b/datafusion/common/src/pyarrow.rs @@ -54,7 +54,7 @@ impl FromPyArrow for ScalarValue { impl ToPyArrow for ScalarValue { fn to_pyarrow(&self, py: Python) -> PyResult { - let array = self.to_array(); + let array = self.to_array()?; // convert to pyarrow array using C data interface let pyarray = array.to_data().to_pyarrow(py)?; let pyscalar = pyarray.call_method1(py, "__getitem__", (0,))?; diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index 0d701eaad283..cdcc9aa4fbc5 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -330,9 +330,9 @@ impl PartialOrd for ScalarValue { let arr2 = list_arr2.value(i); let lt_res = - arrow::compute::kernels::cmp::lt(&arr1, &arr2).unwrap(); + arrow::compute::kernels::cmp::lt(&arr1, &arr2).ok()?; let eq_res = - arrow::compute::kernels::cmp::eq(&arr1, &arr2).unwrap(); + arrow::compute::kernels::cmp::eq(&arr1, &arr2).ok()?; for j in 0..lt_res.len() { if lt_res.is_valid(j) && lt_res.value(j) { @@ -431,6 +431,10 @@ macro_rules! hash_float_value { hash_float_value!((f64, u64), (f32, u32)); // manual implementation of `Hash` +// +// # Panics +// +// Panics if there is an error when creating hash values for rows impl std::hash::Hash for ScalarValue { fn hash(&self, state: &mut H) { use ScalarValue::*; @@ -506,15 +510,19 @@ impl std::hash::Hash for ScalarValue { } } -/// return a reference to the values array and the index into it for a +/// Return a reference to the values array and the index into it for a /// dictionary array +/// +/// # Errors +/// +/// Errors if the array cannot be downcasted to DictionaryArray #[inline] pub fn get_dict_value( array: &dyn Array, index: usize, -) -> (&ArrayRef, Option) { - let dict_array = as_dictionary_array::(array).unwrap(); - (dict_array.values(), dict_array.key(index)) +) -> Result<(&ArrayRef, Option)> { + let dict_array = as_dictionary_array::(array)?; + Ok((dict_array.values(), dict_array.key(index))) } /// Create a dictionary array representing `value` repeated `size` @@ -522,9 +530,9 @@ pub fn get_dict_value( fn dict_from_scalar( value: &ScalarValue, size: usize, -) -> ArrayRef { +) -> Result { // values array is one element long (the value) - let values_array = value.to_array_of_size(1); + let values_array = value.to_array_of_size(1)?; // Create a key array with `size` elements, each of 0 let key_array: PrimitiveArray = std::iter::repeat(Some(K::default_value())) @@ -536,11 +544,9 @@ fn dict_from_scalar( // Note: this path could be made faster by using the ArrayData // APIs and skipping validation, if it every comes up in // performance traces. - Arc::new( - DictionaryArray::::try_new(key_array, values_array) - // should always be valid by construction above - .expect("Can not construct dictionary array"), - ) + Ok(Arc::new( + DictionaryArray::::try_new(key_array, values_array)?, // should always be valid by construction above + )) } /// Create a dictionary array representing all the values in values @@ -579,24 +585,44 @@ fn dict_from_values( macro_rules! typed_cast_tz { ($array:expr, $index:expr, $ARRAYTYPE:ident, $SCALAR:ident, $TZ:expr) => {{ - let array = $array.as_any().downcast_ref::<$ARRAYTYPE>().unwrap(); - ScalarValue::$SCALAR( + use std::any::type_name; + let array = $array + .as_any() + .downcast_ref::<$ARRAYTYPE>() + .ok_or_else(|| { + DataFusionError::Internal(format!( + "could not cast value to {}", + type_name::<$ARRAYTYPE>() + )) + })?; + Ok::(ScalarValue::$SCALAR( match array.is_null($index) { true => None, false => Some(array.value($index).into()), }, $TZ.clone(), - ) + )) }}; } macro_rules! typed_cast { ($array:expr, $index:expr, $ARRAYTYPE:ident, $SCALAR:ident) => {{ - let array = $array.as_any().downcast_ref::<$ARRAYTYPE>().unwrap(); - ScalarValue::$SCALAR(match array.is_null($index) { - true => None, - false => Some(array.value($index).into()), - }) + use std::any::type_name; + let array = $array + .as_any() + .downcast_ref::<$ARRAYTYPE>() + .ok_or_else(|| { + DataFusionError::Internal(format!( + "could not cast value to {}", + type_name::<$ARRAYTYPE>() + )) + })?; + Ok::(ScalarValue::$SCALAR( + match array.is_null($index) { + true => None, + false => Some(array.value($index).into()), + }, + )) }}; } @@ -628,12 +654,21 @@ macro_rules! build_timestamp_array_from_option { macro_rules! eq_array_primitive { ($array:expr, $index:expr, $ARRAYTYPE:ident, $VALUE:expr) => {{ - let array = $array.as_any().downcast_ref::<$ARRAYTYPE>().unwrap(); + use std::any::type_name; + let array = $array + .as_any() + .downcast_ref::<$ARRAYTYPE>() + .ok_or_else(|| { + DataFusionError::Internal(format!( + "could not cast value to {}", + type_name::<$ARRAYTYPE>() + )) + })?; let is_valid = array.is_valid($index); - match $VALUE { + Ok::(match $VALUE { Some(val) => is_valid && &array.value($index) == val, None => !is_valid, - } + }) }}; } @@ -935,7 +970,7 @@ impl ScalarValue { /// NB: operating on `ScalarValue` directly is not efficient, performance sensitive code /// should operate on Arrays directly, using vectorized array kernels pub fn add>(&self, other: T) -> Result { - let r = add_wrapping(&self.to_scalar(), &other.borrow().to_scalar())?; + let r = add_wrapping(&self.to_scalar()?, &other.borrow().to_scalar()?)?; Self::try_from_array(r.as_ref(), 0) } /// Checked addition of `ScalarValue` @@ -943,7 +978,7 @@ impl ScalarValue { /// NB: operating on `ScalarValue` directly is not efficient, performance sensitive code /// should operate on Arrays directly, using vectorized array kernels pub fn add_checked>(&self, other: T) -> Result { - let r = add(&self.to_scalar(), &other.borrow().to_scalar())?; + let r = add(&self.to_scalar()?, &other.borrow().to_scalar()?)?; Self::try_from_array(r.as_ref(), 0) } @@ -952,7 +987,7 @@ impl ScalarValue { /// NB: operating on `ScalarValue` directly is not efficient, performance sensitive code /// should operate on Arrays directly, using vectorized array kernels pub fn sub>(&self, other: T) -> Result { - let r = sub_wrapping(&self.to_scalar(), &other.borrow().to_scalar())?; + let r = sub_wrapping(&self.to_scalar()?, &other.borrow().to_scalar()?)?; Self::try_from_array(r.as_ref(), 0) } @@ -961,7 +996,7 @@ impl ScalarValue { /// NB: operating on `ScalarValue` directly is not efficient, performance sensitive code /// should operate on Arrays directly, using vectorized array kernels pub fn sub_checked>(&self, other: T) -> Result { - let r = sub(&self.to_scalar(), &other.borrow().to_scalar())?; + let r = sub(&self.to_scalar()?, &other.borrow().to_scalar()?)?; Self::try_from_array(r.as_ref(), 0) } @@ -1050,7 +1085,11 @@ impl ScalarValue { } /// Converts a scalar value into an 1-row array. - pub fn to_array(&self) -> ArrayRef { + /// + /// # Errors + /// + /// Errors if the ScalarValue cannot be converted into a 1-row array + pub fn to_array(&self) -> Result { self.to_array_of_size(1) } @@ -1059,6 +1098,10 @@ impl ScalarValue { /// /// This can be used to call arrow compute kernels such as `lt` /// + /// # Errors + /// + /// Errors if the ScalarValue cannot be converted into a 1-row array + /// /// # Example /// ``` /// use datafusion_common::ScalarValue; @@ -1069,7 +1112,7 @@ impl ScalarValue { /// /// let result = arrow::compute::kernels::cmp::lt( /// &arr, - /// &five.to_scalar(), + /// &five.to_scalar().unwrap(), /// ).unwrap(); /// /// let expected = BooleanArray::from(vec![ @@ -1082,8 +1125,8 @@ impl ScalarValue { /// assert_eq!(&result, &expected); /// ``` /// [`Datum`]: arrow_array::Datum - pub fn to_scalar(&self) -> Scalar { - Scalar::new(self.to_array_of_size(1)) + pub fn to_scalar(&self) -> Result> { + Ok(Scalar::new(self.to_array_of_size(1)?)) } /// Converts an iterator of references [`ScalarValue`] into an [`ArrayRef`] @@ -1093,6 +1136,10 @@ impl ScalarValue { /// Returns an error if the iterator is empty or if the /// [`ScalarValue`]s are not all the same type /// + /// # Panics + /// + /// Panics if `self` is a dictionary with invalid key type + /// /// # Example /// ``` /// use datafusion_common::ScalarValue; @@ -1199,28 +1246,29 @@ impl ScalarValue { macro_rules! build_array_list_primitive { ($ARRAY_TY:ident, $SCALAR_TY:ident, $NATIVE_TYPE:ident) => {{ - Arc::new(ListArray::from_iter_primitive::<$ARRAY_TY, _, _>( + Ok::(Arc::new(ListArray::from_iter_primitive::<$ARRAY_TY, _, _>( scalars.into_iter().map(|x| match x { ScalarValue::List(arr) => { // `ScalarValue::List` contains a single element `ListArray`. let list_arr = as_list_array(&arr); if list_arr.is_null(0) { - None + Ok(None) } else { let primitive_arr = list_arr.values().as_primitive::<$ARRAY_TY>(); - Some( + Ok(Some( primitive_arr.into_iter().collect::>>(), - ) + )) } } - sv => panic!( + sv => _internal_err!( "Inconsistent types in ScalarValue::iter_to_array. \ Expected {:?}, got {:?}", data_type, sv ), - }), - )) + }) + .collect::>>()?, + ))) }}; } @@ -1273,7 +1321,7 @@ impl ScalarValue { ScalarValue::iter_to_decimal256_array(scalars, *precision, *scale)?; Arc::new(decimal_array) } - DataType::Null => ScalarValue::iter_to_null_array(scalars), + DataType::Null => ScalarValue::iter_to_null_array(scalars)?, DataType::Boolean => build_array_primitive!(BooleanArray, Boolean), DataType::Float32 => build_array_primitive!(Float32Array, Float32), DataType::Float64 => build_array_primitive!(Float64Array, Float64), @@ -1337,34 +1385,34 @@ impl ScalarValue { build_array_primitive!(IntervalMonthDayNanoArray, IntervalMonthDayNano) } DataType::List(fields) if fields.data_type() == &DataType::Int8 => { - build_array_list_primitive!(Int8Type, Int8, i8) + build_array_list_primitive!(Int8Type, Int8, i8)? } DataType::List(fields) if fields.data_type() == &DataType::Int16 => { - build_array_list_primitive!(Int16Type, Int16, i16) + build_array_list_primitive!(Int16Type, Int16, i16)? } DataType::List(fields) if fields.data_type() == &DataType::Int32 => { - build_array_list_primitive!(Int32Type, Int32, i32) + build_array_list_primitive!(Int32Type, Int32, i32)? } DataType::List(fields) if fields.data_type() == &DataType::Int64 => { - build_array_list_primitive!(Int64Type, Int64, i64) + build_array_list_primitive!(Int64Type, Int64, i64)? } DataType::List(fields) if fields.data_type() == &DataType::UInt8 => { - build_array_list_primitive!(UInt8Type, UInt8, u8) + build_array_list_primitive!(UInt8Type, UInt8, u8)? } DataType::List(fields) if fields.data_type() == &DataType::UInt16 => { - build_array_list_primitive!(UInt16Type, UInt16, u16) + build_array_list_primitive!(UInt16Type, UInt16, u16)? } DataType::List(fields) if fields.data_type() == &DataType::UInt32 => { - build_array_list_primitive!(UInt32Type, UInt32, u32) + build_array_list_primitive!(UInt32Type, UInt32, u32)? } DataType::List(fields) if fields.data_type() == &DataType::UInt64 => { - build_array_list_primitive!(UInt64Type, UInt64, u64) + build_array_list_primitive!(UInt64Type, UInt64, u64)? } DataType::List(fields) if fields.data_type() == &DataType::Float32 => { - build_array_list_primitive!(Float32Type, Float32, f32) + build_array_list_primitive!(Float32Type, Float32, f32)? } DataType::List(fields) if fields.data_type() == &DataType::Float64 => { - build_array_list_primitive!(Float64Type, Float64, f64) + build_array_list_primitive!(Float64Type, Float64, f64)? } DataType::List(fields) if fields.data_type() == &DataType::Utf8 => { build_array_list_string!(StringBuilder, as_string_array) @@ -1432,7 +1480,7 @@ impl ScalarValue { if &inner_key_type == key_type { Ok(*scalar) } else { - panic!("Expected inner key type of {key_type} but found: {inner_key_type}, value was ({scalar:?})"); + _internal_err!("Expected inner key type of {key_type} but found: {inner_key_type}, value was ({scalar:?})") } } _ => { @@ -1504,15 +1552,19 @@ impl ScalarValue { Ok(array) } - fn iter_to_null_array(scalars: impl IntoIterator) -> ArrayRef { - let length = - scalars - .into_iter() - .fold(0usize, |r, element: ScalarValue| match element { - ScalarValue::Null => r + 1, - _ => unreachable!(), - }); - new_null_array(&DataType::Null, length) + fn iter_to_null_array( + scalars: impl IntoIterator, + ) -> Result { + let length = scalars.into_iter().try_fold( + 0usize, + |r, element: ScalarValue| match element { + ScalarValue::Null => Ok::(r + 1), + s => { + _internal_err!("Expected ScalarValue::Null element. Received {s:?}") + } + }, + )?; + Ok(new_null_array(&DataType::Null, length)) } fn iter_to_decimal_array( @@ -1523,10 +1575,12 @@ impl ScalarValue { let array = scalars .into_iter() .map(|element: ScalarValue| match element { - ScalarValue::Decimal128(v1, _, _) => v1, - _ => unreachable!(), + ScalarValue::Decimal128(v1, _, _) => Ok(v1), + s => { + _internal_err!("Expected ScalarValue::Null element. Received {s:?}") + } }) - .collect::() + .collect::>()? .with_precision_and_scale(precision, scale)?; Ok(array) } @@ -1539,10 +1593,14 @@ impl ScalarValue { let array = scalars .into_iter() .map(|element: ScalarValue| match element { - ScalarValue::Decimal256(v1, _, _) => v1, - _ => unreachable!(), + ScalarValue::Decimal256(v1, _, _) => Ok(v1), + s => { + _internal_err!( + "Expected ScalarValue::Decimal256 element. Received {s:?}" + ) + } }) - .collect::() + .collect::>()? .with_precision_and_scale(precision, scale)?; Ok(array) } @@ -1607,17 +1665,17 @@ impl ScalarValue { precision: u8, scale: i8, size: usize, - ) -> Decimal128Array { + ) -> Result { match value { Some(val) => Decimal128Array::from(vec![val; size]) .with_precision_and_scale(precision, scale) - .unwrap(), + .map_err(DataFusionError::ArrowError), None => { let mut builder = Decimal128Array::builder(size) .with_precision_and_scale(precision, scale) - .unwrap(); + .map_err(DataFusionError::ArrowError)?; builder.append_nulls(size); - builder.finish() + Ok(builder.finish()) } } } @@ -1627,12 +1685,12 @@ impl ScalarValue { precision: u8, scale: i8, size: usize, - ) -> Decimal256Array { + ) -> Result { std::iter::repeat(value) .take(size) .collect::() .with_precision_and_scale(precision, scale) - .unwrap() + .map_err(DataFusionError::ArrowError) } /// Converts `Vec` where each element has type corresponding to @@ -1671,13 +1729,21 @@ impl ScalarValue { } /// Converts a scalar value into an array of `size` rows. - pub fn to_array_of_size(&self, size: usize) -> ArrayRef { - match self { + /// + /// # Errors + /// + /// Errors if `self` is + /// - a decimal that fails be converted to a decimal array of size + /// - a `Fixedsizelist` that is not supported yet + /// - a `List` that fails to be concatenated into an array of size + /// - a `Dictionary` that fails be converted to a dictionary array of size + pub fn to_array_of_size(&self, size: usize) -> Result { + Ok(match self { ScalarValue::Decimal128(e, precision, scale) => Arc::new( - ScalarValue::build_decimal_array(*e, *precision, *scale, size), + ScalarValue::build_decimal_array(*e, *precision, *scale, size)?, ), ScalarValue::Decimal256(e, precision, scale) => Arc::new( - ScalarValue::build_decimal256_array(*e, *precision, *scale, size), + ScalarValue::build_decimal256_array(*e, *precision, *scale, size)?, ), ScalarValue::Boolean(e) => { Arc::new(BooleanArray::from(vec![*e; size])) as ArrayRef @@ -1790,13 +1856,14 @@ impl ScalarValue { ), }, ScalarValue::Fixedsizelist(..) => { - unimplemented!("FixedSizeList is not supported yet") + return _not_impl_err!("FixedSizeList is not supported yet") } ScalarValue::List(arr) => { let arrays = std::iter::repeat(arr.as_ref()) .take(size) .collect::>(); - arrow::compute::concat(arrays.as_slice()).unwrap() + arrow::compute::concat(arrays.as_slice()) + .map_err(DataFusionError::ArrowError)? } ScalarValue::Date32(e) => { build_array_from_option!(Date32, Date32Array, e, size) @@ -1891,13 +1958,13 @@ impl ScalarValue { ), ScalarValue::Struct(values, fields) => match values { Some(values) => { - let field_values: Vec<_> = fields + let field_values = fields .iter() .zip(values.iter()) .map(|(field, value)| { - (field.clone(), value.to_array_of_size(size)) + Ok((field.clone(), value.to_array_of_size(size)?)) }) - .collect(); + .collect::>>()?; Arc::new(StructArray::from(field_values)) } @@ -1909,19 +1976,19 @@ impl ScalarValue { ScalarValue::Dictionary(key_type, v) => { // values array is one element long (the value) match key_type.as_ref() { - DataType::Int8 => dict_from_scalar::(v, size), - DataType::Int16 => dict_from_scalar::(v, size), - DataType::Int32 => dict_from_scalar::(v, size), - DataType::Int64 => dict_from_scalar::(v, size), - DataType::UInt8 => dict_from_scalar::(v, size), - DataType::UInt16 => dict_from_scalar::(v, size), - DataType::UInt32 => dict_from_scalar::(v, size), - DataType::UInt64 => dict_from_scalar::(v, size), + DataType::Int8 => dict_from_scalar::(v, size)?, + DataType::Int16 => dict_from_scalar::(v, size)?, + DataType::Int32 => dict_from_scalar::(v, size)?, + DataType::Int64 => dict_from_scalar::(v, size)?, + DataType::UInt8 => dict_from_scalar::(v, size)?, + DataType::UInt16 => dict_from_scalar::(v, size)?, + DataType::UInt32 => dict_from_scalar::(v, size)?, + DataType::UInt64 => dict_from_scalar::(v, size)?, _ => unreachable!("Invalid dictionary keys type: {:?}", key_type), } } ScalarValue::Null => new_null_array(&DataType::Null, size), - } + }) } fn get_decimal_value_from_array( @@ -2037,23 +2104,25 @@ impl ScalarValue { array, index, *precision, *scale, )? } - DataType::Boolean => typed_cast!(array, index, BooleanArray, Boolean), - DataType::Float64 => typed_cast!(array, index, Float64Array, Float64), - DataType::Float32 => typed_cast!(array, index, Float32Array, Float32), - DataType::UInt64 => typed_cast!(array, index, UInt64Array, UInt64), - DataType::UInt32 => typed_cast!(array, index, UInt32Array, UInt32), - DataType::UInt16 => typed_cast!(array, index, UInt16Array, UInt16), - DataType::UInt8 => typed_cast!(array, index, UInt8Array, UInt8), - DataType::Int64 => typed_cast!(array, index, Int64Array, Int64), - DataType::Int32 => typed_cast!(array, index, Int32Array, Int32), - DataType::Int16 => typed_cast!(array, index, Int16Array, Int16), - DataType::Int8 => typed_cast!(array, index, Int8Array, Int8), - DataType::Binary => typed_cast!(array, index, BinaryArray, Binary), + DataType::Boolean => typed_cast!(array, index, BooleanArray, Boolean)?, + DataType::Float64 => typed_cast!(array, index, Float64Array, Float64)?, + DataType::Float32 => typed_cast!(array, index, Float32Array, Float32)?, + DataType::UInt64 => typed_cast!(array, index, UInt64Array, UInt64)?, + DataType::UInt32 => typed_cast!(array, index, UInt32Array, UInt32)?, + DataType::UInt16 => typed_cast!(array, index, UInt16Array, UInt16)?, + DataType::UInt8 => typed_cast!(array, index, UInt8Array, UInt8)?, + DataType::Int64 => typed_cast!(array, index, Int64Array, Int64)?, + DataType::Int32 => typed_cast!(array, index, Int32Array, Int32)?, + DataType::Int16 => typed_cast!(array, index, Int16Array, Int16)?, + DataType::Int8 => typed_cast!(array, index, Int8Array, Int8)?, + DataType::Binary => typed_cast!(array, index, BinaryArray, Binary)?, DataType::LargeBinary => { - typed_cast!(array, index, LargeBinaryArray, LargeBinary) + typed_cast!(array, index, LargeBinaryArray, LargeBinary)? + } + DataType::Utf8 => typed_cast!(array, index, StringArray, Utf8)?, + DataType::LargeUtf8 => { + typed_cast!(array, index, LargeStringArray, LargeUtf8)? } - DataType::Utf8 => typed_cast!(array, index, StringArray, Utf8), - DataType::LargeUtf8 => typed_cast!(array, index, LargeStringArray, LargeUtf8), DataType::List(_) => { let list_array = as_list_array(array); let nested_array = list_array.value(index); @@ -2071,70 +2140,58 @@ impl ScalarValue { ScalarValue::List(arr) } - DataType::Date32 => { - typed_cast!(array, index, Date32Array, Date32) - } - DataType::Date64 => { - typed_cast!(array, index, Date64Array, Date64) - } + DataType::Date32 => typed_cast!(array, index, Date32Array, Date32)?, + DataType::Date64 => typed_cast!(array, index, Date64Array, Date64)?, DataType::Time32(TimeUnit::Second) => { - typed_cast!(array, index, Time32SecondArray, Time32Second) + typed_cast!(array, index, Time32SecondArray, Time32Second)? } DataType::Time32(TimeUnit::Millisecond) => { - typed_cast!(array, index, Time32MillisecondArray, Time32Millisecond) + typed_cast!(array, index, Time32MillisecondArray, Time32Millisecond)? } DataType::Time64(TimeUnit::Microsecond) => { - typed_cast!(array, index, Time64MicrosecondArray, Time64Microsecond) + typed_cast!(array, index, Time64MicrosecondArray, Time64Microsecond)? } DataType::Time64(TimeUnit::Nanosecond) => { - typed_cast!(array, index, Time64NanosecondArray, Time64Nanosecond) - } - DataType::Timestamp(TimeUnit::Second, tz_opt) => { - typed_cast_tz!( - array, - index, - TimestampSecondArray, - TimestampSecond, - tz_opt - ) - } - DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => { - typed_cast_tz!( - array, - index, - TimestampMillisecondArray, - TimestampMillisecond, - tz_opt - ) - } - DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => { - typed_cast_tz!( - array, - index, - TimestampMicrosecondArray, - TimestampMicrosecond, - tz_opt - ) - } - DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => { - typed_cast_tz!( - array, - index, - TimestampNanosecondArray, - TimestampNanosecond, - tz_opt - ) + typed_cast!(array, index, Time64NanosecondArray, Time64Nanosecond)? } + DataType::Timestamp(TimeUnit::Second, tz_opt) => typed_cast_tz!( + array, + index, + TimestampSecondArray, + TimestampSecond, + tz_opt + )?, + DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => typed_cast_tz!( + array, + index, + TimestampMillisecondArray, + TimestampMillisecond, + tz_opt + )?, + DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => typed_cast_tz!( + array, + index, + TimestampMicrosecondArray, + TimestampMicrosecond, + tz_opt + )?, + DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => typed_cast_tz!( + array, + index, + TimestampNanosecondArray, + TimestampNanosecond, + tz_opt + )?, DataType::Dictionary(key_type, _) => { let (values_array, values_index) = match key_type.as_ref() { - DataType::Int8 => get_dict_value::(array, index), - DataType::Int16 => get_dict_value::(array, index), - DataType::Int32 => get_dict_value::(array, index), - DataType::Int64 => get_dict_value::(array, index), - DataType::UInt8 => get_dict_value::(array, index), - DataType::UInt16 => get_dict_value::(array, index), - DataType::UInt32 => get_dict_value::(array, index), - DataType::UInt64 => get_dict_value::(array, index), + DataType::Int8 => get_dict_value::(array, index)?, + DataType::Int16 => get_dict_value::(array, index)?, + DataType::Int32 => get_dict_value::(array, index)?, + DataType::Int64 => get_dict_value::(array, index)?, + DataType::UInt8 => get_dict_value::(array, index)?, + DataType::UInt16 => get_dict_value::(array, index)?, + DataType::UInt32 => get_dict_value::(array, index)?, + DataType::UInt64 => get_dict_value::(array, index)?, _ => unreachable!("Invalid dictionary keys type: {:?}", key_type), }; // look up the index in the values dictionary @@ -2173,31 +2230,29 @@ impl ScalarValue { ) } DataType::Interval(IntervalUnit::DayTime) => { - typed_cast!(array, index, IntervalDayTimeArray, IntervalDayTime) + typed_cast!(array, index, IntervalDayTimeArray, IntervalDayTime)? } DataType::Interval(IntervalUnit::YearMonth) => { - typed_cast!(array, index, IntervalYearMonthArray, IntervalYearMonth) - } - DataType::Interval(IntervalUnit::MonthDayNano) => { - typed_cast!( - array, - index, - IntervalMonthDayNanoArray, - IntervalMonthDayNano - ) + typed_cast!(array, index, IntervalYearMonthArray, IntervalYearMonth)? } + DataType::Interval(IntervalUnit::MonthDayNano) => typed_cast!( + array, + index, + IntervalMonthDayNanoArray, + IntervalMonthDayNano + )?, DataType::Duration(TimeUnit::Second) => { - typed_cast!(array, index, DurationSecondArray, DurationSecond) + typed_cast!(array, index, DurationSecondArray, DurationSecond)? } DataType::Duration(TimeUnit::Millisecond) => { - typed_cast!(array, index, DurationMillisecondArray, DurationMillisecond) + typed_cast!(array, index, DurationMillisecondArray, DurationMillisecond)? } DataType::Duration(TimeUnit::Microsecond) => { - typed_cast!(array, index, DurationMicrosecondArray, DurationMicrosecond) + typed_cast!(array, index, DurationMicrosecondArray, DurationMicrosecond)? } DataType::Duration(TimeUnit::Nanosecond) => { - typed_cast!(array, index, DurationNanosecondArray, DurationNanosecond) + typed_cast!(array, index, DurationNanosecondArray, DurationNanosecond)? } other => { @@ -2215,7 +2270,7 @@ impl ScalarValue { safe: false, format_options: Default::default(), }; - let cast_arr = cast_with_options(&value.to_array(), target_type, &cast_options)?; + let cast_arr = cast_with_options(&value.to_array()?, target_type, &cast_options)?; ScalarValue::try_from_array(&cast_arr, 0) } @@ -2273,9 +2328,21 @@ impl ScalarValue { /// /// This function has a few narrow usescases such as hash table key /// comparisons where comparing a single row at a time is necessary. + /// + /// # Errors + /// + /// Errors if + /// - it fails to downcast `array` to the data type of `self` + /// - `self` is a `Fixedsizelist` + /// - `self` is a `List` + /// - `self` is a `Struct` + /// + /// # Panics + /// + /// Panics if `self` is a dictionary with invalid key type #[inline] - pub fn eq_array(&self, array: &ArrayRef, index: usize) -> bool { - match self { + pub fn eq_array(&self, array: &ArrayRef, index: usize) -> Result { + Ok(match self { ScalarValue::Decimal128(v, precision, scale) => { ScalarValue::eq_array_decimal( array, @@ -2283,8 +2350,7 @@ impl ScalarValue { v.as_ref(), *precision, *scale, - ) - .unwrap() + )? } ScalarValue::Decimal256(v, precision, scale) => { ScalarValue::eq_array_decimal256( @@ -2293,119 +2359,132 @@ impl ScalarValue { v.as_ref(), *precision, *scale, - ) - .unwrap() + )? } ScalarValue::Boolean(val) => { - eq_array_primitive!(array, index, BooleanArray, val) + eq_array_primitive!(array, index, BooleanArray, val)? } ScalarValue::Float32(val) => { - eq_array_primitive!(array, index, Float32Array, val) + eq_array_primitive!(array, index, Float32Array, val)? } ScalarValue::Float64(val) => { - eq_array_primitive!(array, index, Float64Array, val) + eq_array_primitive!(array, index, Float64Array, val)? + } + ScalarValue::Int8(val) => eq_array_primitive!(array, index, Int8Array, val)?, + ScalarValue::Int16(val) => { + eq_array_primitive!(array, index, Int16Array, val)? + } + ScalarValue::Int32(val) => { + eq_array_primitive!(array, index, Int32Array, val)? + } + ScalarValue::Int64(val) => { + eq_array_primitive!(array, index, Int64Array, val)? + } + ScalarValue::UInt8(val) => { + eq_array_primitive!(array, index, UInt8Array, val)? } - ScalarValue::Int8(val) => eq_array_primitive!(array, index, Int8Array, val), - ScalarValue::Int16(val) => eq_array_primitive!(array, index, Int16Array, val), - ScalarValue::Int32(val) => eq_array_primitive!(array, index, Int32Array, val), - ScalarValue::Int64(val) => eq_array_primitive!(array, index, Int64Array, val), - ScalarValue::UInt8(val) => eq_array_primitive!(array, index, UInt8Array, val), ScalarValue::UInt16(val) => { - eq_array_primitive!(array, index, UInt16Array, val) + eq_array_primitive!(array, index, UInt16Array, val)? } ScalarValue::UInt32(val) => { - eq_array_primitive!(array, index, UInt32Array, val) + eq_array_primitive!(array, index, UInt32Array, val)? } ScalarValue::UInt64(val) => { - eq_array_primitive!(array, index, UInt64Array, val) + eq_array_primitive!(array, index, UInt64Array, val)? + } + ScalarValue::Utf8(val) => { + eq_array_primitive!(array, index, StringArray, val)? } - ScalarValue::Utf8(val) => eq_array_primitive!(array, index, StringArray, val), ScalarValue::LargeUtf8(val) => { - eq_array_primitive!(array, index, LargeStringArray, val) + eq_array_primitive!(array, index, LargeStringArray, val)? } ScalarValue::Binary(val) => { - eq_array_primitive!(array, index, BinaryArray, val) + eq_array_primitive!(array, index, BinaryArray, val)? } ScalarValue::FixedSizeBinary(_, val) => { - eq_array_primitive!(array, index, FixedSizeBinaryArray, val) + eq_array_primitive!(array, index, FixedSizeBinaryArray, val)? } ScalarValue::LargeBinary(val) => { - eq_array_primitive!(array, index, LargeBinaryArray, val) + eq_array_primitive!(array, index, LargeBinaryArray, val)? + } + ScalarValue::Fixedsizelist(..) => { + return _not_impl_err!("FixedSizeList is not supported yet") } - ScalarValue::Fixedsizelist(..) => unimplemented!(), - ScalarValue::List(_) => unimplemented!("ListArr"), + ScalarValue::List(_) => return _not_impl_err!("List is not supported yet"), ScalarValue::Date32(val) => { - eq_array_primitive!(array, index, Date32Array, val) + eq_array_primitive!(array, index, Date32Array, val)? } ScalarValue::Date64(val) => { - eq_array_primitive!(array, index, Date64Array, val) + eq_array_primitive!(array, index, Date64Array, val)? } ScalarValue::Time32Second(val) => { - eq_array_primitive!(array, index, Time32SecondArray, val) + eq_array_primitive!(array, index, Time32SecondArray, val)? } ScalarValue::Time32Millisecond(val) => { - eq_array_primitive!(array, index, Time32MillisecondArray, val) + eq_array_primitive!(array, index, Time32MillisecondArray, val)? } ScalarValue::Time64Microsecond(val) => { - eq_array_primitive!(array, index, Time64MicrosecondArray, val) + eq_array_primitive!(array, index, Time64MicrosecondArray, val)? } ScalarValue::Time64Nanosecond(val) => { - eq_array_primitive!(array, index, Time64NanosecondArray, val) + eq_array_primitive!(array, index, Time64NanosecondArray, val)? } ScalarValue::TimestampSecond(val, _) => { - eq_array_primitive!(array, index, TimestampSecondArray, val) + eq_array_primitive!(array, index, TimestampSecondArray, val)? } ScalarValue::TimestampMillisecond(val, _) => { - eq_array_primitive!(array, index, TimestampMillisecondArray, val) + eq_array_primitive!(array, index, TimestampMillisecondArray, val)? } ScalarValue::TimestampMicrosecond(val, _) => { - eq_array_primitive!(array, index, TimestampMicrosecondArray, val) + eq_array_primitive!(array, index, TimestampMicrosecondArray, val)? } ScalarValue::TimestampNanosecond(val, _) => { - eq_array_primitive!(array, index, TimestampNanosecondArray, val) + eq_array_primitive!(array, index, TimestampNanosecondArray, val)? } ScalarValue::IntervalYearMonth(val) => { - eq_array_primitive!(array, index, IntervalYearMonthArray, val) + eq_array_primitive!(array, index, IntervalYearMonthArray, val)? } ScalarValue::IntervalDayTime(val) => { - eq_array_primitive!(array, index, IntervalDayTimeArray, val) + eq_array_primitive!(array, index, IntervalDayTimeArray, val)? } ScalarValue::IntervalMonthDayNano(val) => { - eq_array_primitive!(array, index, IntervalMonthDayNanoArray, val) + eq_array_primitive!(array, index, IntervalMonthDayNanoArray, val)? } ScalarValue::DurationSecond(val) => { - eq_array_primitive!(array, index, DurationSecondArray, val) + eq_array_primitive!(array, index, DurationSecondArray, val)? } ScalarValue::DurationMillisecond(val) => { - eq_array_primitive!(array, index, DurationMillisecondArray, val) + eq_array_primitive!(array, index, DurationMillisecondArray, val)? } ScalarValue::DurationMicrosecond(val) => { - eq_array_primitive!(array, index, DurationMicrosecondArray, val) + eq_array_primitive!(array, index, DurationMicrosecondArray, val)? } ScalarValue::DurationNanosecond(val) => { - eq_array_primitive!(array, index, DurationNanosecondArray, val) + eq_array_primitive!(array, index, DurationNanosecondArray, val)? + } + ScalarValue::Struct(_, _) => { + return _not_impl_err!("Struct is not supported yet") } - ScalarValue::Struct(_, _) => unimplemented!(), ScalarValue::Dictionary(key_type, v) => { let (values_array, values_index) = match key_type.as_ref() { - DataType::Int8 => get_dict_value::(array, index), - DataType::Int16 => get_dict_value::(array, index), - DataType::Int32 => get_dict_value::(array, index), - DataType::Int64 => get_dict_value::(array, index), - DataType::UInt8 => get_dict_value::(array, index), - DataType::UInt16 => get_dict_value::(array, index), - DataType::UInt32 => get_dict_value::(array, index), - DataType::UInt64 => get_dict_value::(array, index), + DataType::Int8 => get_dict_value::(array, index)?, + DataType::Int16 => get_dict_value::(array, index)?, + DataType::Int32 => get_dict_value::(array, index)?, + DataType::Int64 => get_dict_value::(array, index)?, + DataType::UInt8 => get_dict_value::(array, index)?, + DataType::UInt16 => get_dict_value::(array, index)?, + DataType::UInt32 => get_dict_value::(array, index)?, + DataType::UInt64 => get_dict_value::(array, index)?, _ => unreachable!("Invalid dictionary keys type: {:?}", key_type), }; // was the value in the array non null? match values_index { - Some(values_index) => v.eq_array(values_array, values_index), + Some(values_index) => v.eq_array(values_array, values_index)?, None => v.is_null(), } } ScalarValue::Null => array.is_null(index), - } + }) } /// Estimate size if bytes including `Self`. For values with internal containers such as `String` @@ -2785,6 +2864,11 @@ macro_rules! format_option { }}; } +// Implement Display trait for ScalarValue +// +// # Panics +// +// Panics if there is an error when creating a visual representation of columns via `arrow::util::pretty` impl fmt::Display for ScalarValue { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { @@ -3031,7 +3115,9 @@ mod tests { ])]); let sv = ScalarValue::List(Arc::new(arr)); - let actual_arr = sv.to_array_of_size(2); + let actual_arr = sv + .to_array_of_size(2) + .expect("Failed to convert to array of size"); let actual_list_arr = as_list_array(&actual_arr); let arr = ListArray::from_iter_primitive::(vec![ @@ -3238,8 +3324,8 @@ mod tests { { let scalar_result = left.add_checked(&right); - let left_array = left.to_array(); - let right_array = right.to_array(); + let left_array = left.to_array().expect("Failed to convert to array"); + let right_array = right.to_array().expect("Failed to convert to array"); let arrow_left_array = left_array.as_primitive::(); let arrow_right_array = right_array.as_primitive::(); let arrow_result = kernels::numeric::add(arrow_left_array, arrow_right_array); @@ -3287,22 +3373,30 @@ mod tests { } // decimal scalar to array - let array = decimal_value.to_array(); + let array = decimal_value + .to_array() + .expect("Failed to convert to array"); let array = as_decimal128_array(&array)?; assert_eq!(1, array.len()); assert_eq!(DataType::Decimal128(10, 1), array.data_type().clone()); assert_eq!(123i128, array.value(0)); // decimal scalar to array with size - let array = decimal_value.to_array_of_size(10); + let array = decimal_value + .to_array_of_size(10) + .expect("Failed to convert to array of size"); let array_decimal = as_decimal128_array(&array)?; assert_eq!(10, array.len()); assert_eq!(DataType::Decimal128(10, 1), array.data_type().clone()); assert_eq!(123i128, array_decimal.value(0)); assert_eq!(123i128, array_decimal.value(9)); // test eq array - assert!(decimal_value.eq_array(&array, 1)); - assert!(decimal_value.eq_array(&array, 5)); + assert!(decimal_value + .eq_array(&array, 1) + .expect("Failed to compare arrays")); + assert!(decimal_value + .eq_array(&array, 5) + .expect("Failed to compare arrays")); // test try from array assert_eq!( decimal_value, @@ -3349,13 +3443,16 @@ mod tests { assert!(ScalarValue::try_new_decimal128(1, 10, 2) .unwrap() - .eq_array(&array, 0)); + .eq_array(&array, 0) + .expect("Failed to compare arrays")); assert!(ScalarValue::try_new_decimal128(2, 10, 2) .unwrap() - .eq_array(&array, 1)); + .eq_array(&array, 1) + .expect("Failed to compare arrays")); assert!(ScalarValue::try_new_decimal128(3, 10, 2) .unwrap() - .eq_array(&array, 2)); + .eq_array(&array, 2) + .expect("Failed to compare arrays")); assert_eq!( ScalarValue::Decimal128(None, 10, 2), ScalarValue::try_from_array(&array, 3).unwrap() @@ -3442,14 +3539,14 @@ mod tests { #[test] fn scalar_value_to_array_u64() -> Result<()> { let value = ScalarValue::UInt64(Some(13u64)); - let array = value.to_array(); + let array = value.to_array().expect("Failed to convert to array"); let array = as_uint64_array(&array)?; assert_eq!(array.len(), 1); assert!(!array.is_null(0)); assert_eq!(array.value(0), 13); let value = ScalarValue::UInt64(None); - let array = value.to_array(); + let array = value.to_array().expect("Failed to convert to array"); let array = as_uint64_array(&array)?; assert_eq!(array.len(), 1); assert!(array.is_null(0)); @@ -3459,14 +3556,14 @@ mod tests { #[test] fn scalar_value_to_array_u32() -> Result<()> { let value = ScalarValue::UInt32(Some(13u32)); - let array = value.to_array(); + let array = value.to_array().expect("Failed to convert to array"); let array = as_uint32_array(&array)?; assert_eq!(array.len(), 1); assert!(!array.is_null(0)); assert_eq!(array.value(0), 13); let value = ScalarValue::UInt32(None); - let array = value.to_array(); + let array = value.to_array().expect("Failed to convert to array"); let array = as_uint32_array(&array)?; assert_eq!(array.len(), 1); assert!(array.is_null(0)); @@ -4025,7 +4122,9 @@ mod tests { for (index, scalar) in scalars.into_iter().enumerate() { assert!( - scalar.eq_array(&array, index), + scalar + .eq_array(&array, index) + .expect("Failed to compare arrays"), "Expected {scalar:?} to be equal to {array:?} at index {index}" ); @@ -4033,7 +4132,7 @@ mod tests { for other_index in 0..array.len() { if index != other_index { assert!( - !scalar.eq_array(&array, other_index), + !scalar.eq_array(&array, other_index).expect("Failed to compare arrays"), "Expected {scalar:?} to be NOT equal to {array:?} at index {other_index}" ); } @@ -4136,7 +4235,9 @@ mod tests { ); // Convert to length-2 array - let array = scalar.to_array_of_size(2); + let array = scalar + .to_array_of_size(2) + .expect("Failed to convert to array of size"); let expected = Arc::new(StructArray::from(vec![ ( @@ -4570,7 +4671,7 @@ mod tests { DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())) ); - let array = scalar.to_array(); + let array = scalar.to_array().expect("Failed to convert to array"); assert_eq!(array.len(), 1); assert_eq!( array.data_type(), @@ -4607,7 +4708,7 @@ mod tests { // mimics how casting work on scalar values by `casting` `scalar` to `desired_type` fn check_scalar_cast(scalar: ScalarValue, desired_type: DataType) { // convert from scalar --> Array to call cast - let scalar_array = scalar.to_array(); + let scalar_array = scalar.to_array().expect("Failed to convert to array"); // cast the actual value let cast_array = kernels::cast::cast(&scalar_array, &desired_type).unwrap(); @@ -4616,7 +4717,9 @@ mod tests { assert_eq!(cast_scalar.data_type(), desired_type); // Some time later the "cast" scalar is turned back into an array: - let array = cast_scalar.to_array_of_size(10); + let array = cast_scalar + .to_array_of_size(10) + .expect("Failed to convert to array of size"); // The datatype should be "Dictionary" but is actually Utf8!!! assert_eq!(array.data_type(), &desired_type) @@ -5065,7 +5168,8 @@ mod tests { let arrays = scalars .iter() .map(ScalarValue::to_array) - .collect::>(); + .collect::>>() + .expect("Failed to convert to array"); let arrays = arrays.iter().map(|a| a.as_ref()).collect::>(); let array = concat(&arrays).unwrap(); check_array(array); diff --git a/datafusion/core/benches/scalar.rs b/datafusion/core/benches/scalar.rs index 30f21a964d5f..540f7212e96e 100644 --- a/datafusion/core/benches/scalar.rs +++ b/datafusion/core/benches/scalar.rs @@ -22,7 +22,15 @@ fn criterion_benchmark(c: &mut Criterion) { c.bench_function("to_array_of_size 100000", |b| { let scalar = ScalarValue::Int32(Some(100)); - b.iter(|| assert_eq!(scalar.to_array_of_size(100000).null_count(), 0)) + b.iter(|| { + assert_eq!( + scalar + .to_array_of_size(100000) + .expect("Failed to convert to array of size") + .null_count(), + 0 + ) + }) }); } diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index d6a0add9b253..986e54ebbe85 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -276,7 +276,10 @@ async fn prune_partitions( // Applies `filter` to `batch` returning `None` on error let do_filter = |filter| -> Option { let expr = create_physical_expr(filter, &df_schema, &schema, &props).ok()?; - Some(expr.evaluate(&batch).ok()?.into_array(partitions.len())) + expr.evaluate(&batch) + .ok()? + .into_array(partitions.len()) + .ok() }; //.Compute the conjunction of the filters, ignoring errors diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 3efb0df9df7c..68e996391cc3 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -336,7 +336,7 @@ impl PartitionColumnProjector { &mut self.key_buffer_cache, partition_value.as_ref(), file_batch.num_rows(), - ), + )?, ) } @@ -396,11 +396,11 @@ fn create_dict_array( dict_val: &ScalarValue, len: usize, data_type: DataType, -) -> ArrayRef +) -> Result where T: ArrowNativeType, { - let dict_vals = dict_val.to_array(); + let dict_vals = dict_val.to_array()?; let sliced_key_buffer = buffer_gen.get_buffer(len); @@ -409,16 +409,16 @@ where .len(len) .add_buffer(sliced_key_buffer); builder = builder.add_child_data(dict_vals.to_data()); - Arc::new(DictionaryArray::::from( + Ok(Arc::new(DictionaryArray::::from( builder.build().unwrap(), - )) + ))) } fn create_output_array( key_buffer_cache: &mut ZeroBufferGenerators, val: &ScalarValue, len: usize, -) -> ArrayRef { +) -> Result { if let ScalarValue::Dictionary(key_type, dict_val) = &val { match key_type.as_ref() { DataType::Int8 => { diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs index 0f4b09caeded..5fe0a0a13a73 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs @@ -126,7 +126,7 @@ impl ArrowPredicate for DatafusionArrowPredicate { match self .physical_expr .evaluate(&batch) - .map(|v| v.into_array(batch.num_rows())) + .and_then(|v| v.into_array(batch.num_rows())) { Ok(array) => { let bool_arr = as_boolean_array(&array)?.clone(); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 91bceed91602..dc6ef50bc101 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -405,7 +405,7 @@ macro_rules! get_min_max_values { .flatten() // column either didn't have statistics at all or didn't have min/max values .or_else(|| Some(null_scalar.clone())) - .map(|s| s.to_array()) + .and_then(|s| s.to_array().ok()) }} } @@ -425,7 +425,7 @@ macro_rules! get_null_count_values { }, ); - Some(value.to_array()) + value.to_array().ok() }}; } diff --git a/datafusion/expr/src/columnar_value.rs b/datafusion/expr/src/columnar_value.rs index c72aae69c831..7a2883928169 100644 --- a/datafusion/expr/src/columnar_value.rs +++ b/datafusion/expr/src/columnar_value.rs @@ -20,7 +20,7 @@ use arrow::array::ArrayRef; use arrow::array::NullArray; use arrow::datatypes::DataType; -use datafusion_common::ScalarValue; +use datafusion_common::{Result, ScalarValue}; use std::sync::Arc; /// Represents the result of evaluating an expression: either a single @@ -47,11 +47,15 @@ impl ColumnarValue { /// Convert a columnar value into an ArrayRef. [`Self::Scalar`] is /// converted by repeating the same scalar multiple times. - pub fn into_array(self, num_rows: usize) -> ArrayRef { - match self { + /// + /// # Errors + /// + /// Errors if `self` is a Scalar that fails to be converted into an array of size + pub fn into_array(self, num_rows: usize) -> Result { + Ok(match self { ColumnarValue::Array(array) => array, - ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows), - } + ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows)?, + }) } /// null columnar values are implemented as a null array in order to pass batch diff --git a/datafusion/expr/src/window_state.rs b/datafusion/expr/src/window_state.rs index 4ea9ecea5fc6..de88396d9b0e 100644 --- a/datafusion/expr/src/window_state.rs +++ b/datafusion/expr/src/window_state.rs @@ -98,7 +98,7 @@ impl WindowAggState { } pub fn new(out_type: &DataType) -> Result { - let empty_out_col = ScalarValue::try_from(out_type)?.to_array_of_size(0); + let empty_out_col = ScalarValue::try_from(out_type)?.to_array_of_size(0)?; Ok(Self { window_frame_range: Range { start: 0, end: 0 }, window_frame_ctx: None, diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs index 468981a5fb0c..907c12b7afb1 100644 --- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs +++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs @@ -1089,8 +1089,12 @@ mod tests { // Verify that calling the arrow // cast kernel yields the same results // input array - let literal_array = literal.to_array_of_size(1); - let expected_array = expected_value.to_array_of_size(1); + let literal_array = literal + .to_array_of_size(1) + .expect("Failed to convert to array of size"); + let expected_array = expected_value + .to_array_of_size(1) + .expect("Failed to convert to array of size"); let cast_array = cast_with_options( &literal_array, &target_type, diff --git a/datafusion/physical-expr/src/aggregate/correlation.rs b/datafusion/physical-expr/src/aggregate/correlation.rs index 475bfa4ce0da..61f2db5c8ef9 100644 --- a/datafusion/physical-expr/src/aggregate/correlation.rs +++ b/datafusion/physical-expr/src/aggregate/correlation.rs @@ -505,13 +505,17 @@ mod tests { let values1 = expr1 .iter() - .map(|e| e.evaluate(batch1)) - .map(|r| r.map(|v| v.into_array(batch1.num_rows()))) + .map(|e| { + e.evaluate(batch1) + .and_then(|v| v.into_array(batch1.num_rows())) + }) .collect::>>()?; let values2 = expr2 .iter() - .map(|e| e.evaluate(batch2)) - .map(|r| r.map(|v| v.into_array(batch2.num_rows()))) + .map(|e| { + e.evaluate(batch2) + .and_then(|v| v.into_array(batch2.num_rows())) + }) .collect::>>()?; accum1.update_batch(&values1)?; accum2.update_batch(&values2)?; diff --git a/datafusion/physical-expr/src/aggregate/covariance.rs b/datafusion/physical-expr/src/aggregate/covariance.rs index 5e589d4e39fd..0f838eb6fa1c 100644 --- a/datafusion/physical-expr/src/aggregate/covariance.rs +++ b/datafusion/physical-expr/src/aggregate/covariance.rs @@ -754,13 +754,17 @@ mod tests { let values1 = expr1 .iter() - .map(|e| e.evaluate(batch1)) - .map(|r| r.map(|v| v.into_array(batch1.num_rows()))) + .map(|e| { + e.evaluate(batch1) + .and_then(|v| v.into_array(batch1.num_rows())) + }) .collect::>>()?; let values2 = expr2 .iter() - .map(|e| e.evaluate(batch2)) - .map(|r| r.map(|v| v.into_array(batch2.num_rows()))) + .map(|e| { + e.evaluate(batch2) + .and_then(|v| v.into_array(batch2.num_rows())) + }) .collect::>>()?; accum1.update_batch(&values1)?; accum2.update_batch(&values2)?; diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs b/datafusion/physical-expr/src/aggregate/first_last.rs index a4e0a6dc49a9..0dc27dede8b6 100644 --- a/datafusion/physical-expr/src/aggregate/first_last.rs +++ b/datafusion/physical-expr/src/aggregate/first_last.rs @@ -587,7 +587,10 @@ mod tests { let mut states = vec![]; for idx in 0..state1.len() { - states.push(concat(&[&state1[idx].to_array(), &state2[idx].to_array()])?); + states.push(concat(&[ + &state1[idx].to_array()?, + &state2[idx].to_array()?, + ])?); } let mut first_accumulator = @@ -614,7 +617,10 @@ mod tests { let mut states = vec![]; for idx in 0..state1.len() { - states.push(concat(&[&state1[idx].to_array(), &state2[idx].to_array()])?); + states.push(concat(&[ + &state1[idx].to_array()?, + &state2[idx].to_array()?, + ])?); } let mut last_accumulator = diff --git a/datafusion/physical-expr/src/aggregate/stddev.rs b/datafusion/physical-expr/src/aggregate/stddev.rs index 330507d6ffa6..64e19ef502c7 100644 --- a/datafusion/physical-expr/src/aggregate/stddev.rs +++ b/datafusion/physical-expr/src/aggregate/stddev.rs @@ -445,13 +445,17 @@ mod tests { let values1 = expr1 .iter() - .map(|e| e.evaluate(batch1)) - .map(|r| r.map(|v| v.into_array(batch1.num_rows()))) + .map(|e| { + e.evaluate(batch1) + .and_then(|v| v.into_array(batch1.num_rows())) + }) .collect::>>()?; let values2 = expr2 .iter() - .map(|e| e.evaluate(batch2)) - .map(|r| r.map(|v| v.into_array(batch2.num_rows()))) + .map(|e| { + e.evaluate(batch2) + .and_then(|v| v.into_array(batch2.num_rows())) + }) .collect::>>()?; accum1.update_batch(&values1)?; accum2.update_batch(&values2)?; diff --git a/datafusion/physical-expr/src/aggregate/utils.rs b/datafusion/physical-expr/src/aggregate/utils.rs index da3a52713231..e5421ef5ab7e 100644 --- a/datafusion/physical-expr/src/aggregate/utils.rs +++ b/datafusion/physical-expr/src/aggregate/utils.rs @@ -36,11 +36,11 @@ use std::sync::Arc; pub fn get_accum_scalar_values_as_arrays( accum: &dyn Accumulator, ) -> Result> { - Ok(accum + accum .state()? .iter() .map(|s| s.to_array_of_size(1)) - .collect::>()) + .collect::>>() } /// Computes averages for `Decimal128`/`Decimal256` values, checking for overflow diff --git a/datafusion/physical-expr/src/aggregate/variance.rs b/datafusion/physical-expr/src/aggregate/variance.rs index a720dd833a87..d82c5ad5626f 100644 --- a/datafusion/physical-expr/src/aggregate/variance.rs +++ b/datafusion/physical-expr/src/aggregate/variance.rs @@ -519,13 +519,17 @@ mod tests { let values1 = expr1 .iter() - .map(|e| e.evaluate(batch1)) - .map(|r| r.map(|v| v.into_array(batch1.num_rows()))) + .map(|e| { + e.evaluate(batch1) + .and_then(|v| v.into_array(batch1.num_rows())) + }) .collect::>>()?; let values2 = expr2 .iter() - .map(|e| e.evaluate(batch2)) - .map(|r| r.map(|v| v.into_array(batch2.num_rows()))) + .map(|e| { + e.evaluate(batch2) + .and_then(|v| v.into_array(batch2.num_rows())) + }) .collect::>>()?; accum1.update_batch(&values1)?; accum2.update_batch(&values2)?; diff --git a/datafusion/physical-expr/src/conditional_expressions.rs b/datafusion/physical-expr/src/conditional_expressions.rs index 37adb2d71ce8..a9a25ffe2ec1 100644 --- a/datafusion/physical-expr/src/conditional_expressions.rs +++ b/datafusion/physical-expr/src/conditional_expressions.rs @@ -54,7 +54,7 @@ pub fn coalesce(args: &[ColumnarValue]) -> Result { if value.is_null() { continue; } else { - let last_value = value.to_array_of_size(size); + let last_value = value.to_array_of_size(size)?; current_value = zip(&remainder, &last_value, current_value.as_ref())?; break; diff --git a/datafusion/physical-expr/src/datetime_expressions.rs b/datafusion/physical-expr/src/datetime_expressions.rs index 3b61e7f48d59..5b597de78ac9 100644 --- a/datafusion/physical-expr/src/datetime_expressions.rs +++ b/datafusion/physical-expr/src/datetime_expressions.rs @@ -852,7 +852,7 @@ pub fn date_part(args: &[ColumnarValue]) -> Result { let array = match array { ColumnarValue::Array(array) => array.clone(), - ColumnarValue::Scalar(scalar) => scalar.to_array(), + ColumnarValue::Scalar(scalar) => scalar.to_array()?, }; let arr = match date_part.to_lowercase().as_str() { diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 63fa98011fdd..0a05a479e5a7 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -304,8 +304,8 @@ impl PhysicalExpr for BinaryExpr { // if both arrays or both literals - extract arrays and continue execution let (left, right) = ( - lhs.into_array(batch.num_rows()), - rhs.into_array(batch.num_rows()), + lhs.into_array(batch.num_rows())?, + rhs.into_array(batch.num_rows())?, ); self.evaluate_with_resolved_args(left, &left_data_type, right, &right_data_type) .map(ColumnarValue::Array) @@ -597,7 +597,10 @@ mod tests { let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])?; - let result = lt.evaluate(&batch)?.into_array(batch.num_rows()); + let result = lt + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); assert_eq!(result.len(), 5); let expected = [false, false, true, true, true]; @@ -641,7 +644,10 @@ mod tests { assert_eq!("a@0 < b@1 OR a@0 = b@1", format!("{expr}")); - let result = expr.evaluate(&batch)?.into_array(batch.num_rows()); + let result = expr + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); assert_eq!(result.len(), 5); let expected = [true, true, false, true, false]; @@ -685,7 +691,7 @@ mod tests { assert_eq!(expression.data_type(&schema)?, $C_TYPE); // compute - let result = expression.evaluate(&batch)?.into_array(batch.num_rows()); + let result = expression.evaluate(&batch)?.into_array(batch.num_rows()).expect("Failed to convert to array"); // verify that the array's data_type is correct assert_eq!(*result.data_type(), $C_TYPE); @@ -2138,7 +2144,10 @@ mod tests { let arithmetic_op = binary_op(col("a", &schema)?, op, col("b", &schema)?, &schema)?; let batch = RecordBatch::try_new(schema, data)?; - let result = arithmetic_op.evaluate(&batch)?.into_array(batch.num_rows()); + let result = arithmetic_op + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); assert_eq!(result.as_ref(), &expected); Ok(()) @@ -2154,7 +2163,10 @@ mod tests { let lit = Arc::new(Literal::new(literal)); let arithmetic_op = binary_op(col("a", &schema)?, op, lit, &schema)?; let batch = RecordBatch::try_new(schema, data)?; - let result = arithmetic_op.evaluate(&batch)?.into_array(batch.num_rows()); + let result = arithmetic_op + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); assert_eq!(&result, &expected); Ok(()) @@ -2170,7 +2182,10 @@ mod tests { let op = binary_op(col("a", schema)?, op, col("b", schema)?, schema)?; let data: Vec = vec![left.clone(), right.clone()]; let batch = RecordBatch::try_new(schema.clone(), data)?; - let result = op.evaluate(&batch)?.into_array(batch.num_rows()); + let result = op + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); assert_eq!(result.as_ref(), &expected); Ok(()) @@ -2187,7 +2202,10 @@ mod tests { let scalar = lit(scalar.clone()); let op = binary_op(scalar, op, col("a", schema)?, schema)?; let batch = RecordBatch::try_new(Arc::clone(schema), vec![Arc::clone(arr)])?; - let result = op.evaluate(&batch)?.into_array(batch.num_rows()); + let result = op + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); assert_eq!(result.as_ref(), expected); Ok(()) @@ -2204,7 +2222,10 @@ mod tests { let scalar = lit(scalar.clone()); let op = binary_op(col("a", schema)?, op, scalar, schema)?; let batch = RecordBatch::try_new(Arc::clone(schema), vec![Arc::clone(arr)])?; - let result = op.evaluate(&batch)?.into_array(batch.num_rows()); + let result = op + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); assert_eq!(result.as_ref(), expected); Ok(()) @@ -2776,7 +2797,8 @@ mod tests { let result = expr .evaluate(&batch) .expect("evaluation") - .into_array(batch.num_rows()); + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); let expected: Int32Array = input .into_iter() @@ -3255,7 +3277,10 @@ mod tests { let arithmetic_op = binary_op(col("a", schema)?, op, col("b", schema)?, schema)?; let data: Vec = vec![left.clone(), right.clone()]; let batch = RecordBatch::try_new(schema.clone(), data)?; - let result = arithmetic_op.evaluate(&batch)?.into_array(batch.num_rows()); + let result = arithmetic_op + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); assert_eq!(result.as_ref(), expected.as_ref()); Ok(()) diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs index a2395c4a0ca2..5fcfd61d90e4 100644 --- a/datafusion/physical-expr/src/expressions/case.rs +++ b/datafusion/physical-expr/src/expressions/case.rs @@ -126,7 +126,7 @@ impl CaseExpr { let return_type = self.data_type(&batch.schema())?; let expr = self.expr.as_ref().unwrap(); let base_value = expr.evaluate(batch)?; - let base_value = base_value.into_array(batch.num_rows()); + let base_value = base_value.into_array(batch.num_rows())?; let base_nulls = is_null(base_value.as_ref())?; // start with nulls as default output @@ -137,7 +137,7 @@ impl CaseExpr { let when_value = self.when_then_expr[i] .0 .evaluate_selection(batch, &remainder)?; - let when_value = when_value.into_array(batch.num_rows()); + let when_value = when_value.into_array(batch.num_rows())?; // build boolean array representing which rows match the "when" value let when_match = eq(&when_value, &base_value)?; // Treat nulls as false @@ -153,7 +153,7 @@ impl CaseExpr { ColumnarValue::Scalar(value) if value.is_null() => { new_null_array(&return_type, batch.num_rows()) } - _ => then_value.into_array(batch.num_rows()), + _ => then_value.into_array(batch.num_rows())?, }; current_value = @@ -170,7 +170,7 @@ impl CaseExpr { remainder = or(&base_nulls, &remainder)?; let else_ = expr .evaluate_selection(batch, &remainder)? - .into_array(batch.num_rows()); + .into_array(batch.num_rows())?; current_value = zip(&remainder, else_.as_ref(), current_value.as_ref())?; } @@ -194,7 +194,7 @@ impl CaseExpr { let when_value = self.when_then_expr[i] .0 .evaluate_selection(batch, &remainder)?; - let when_value = when_value.into_array(batch.num_rows()); + let when_value = when_value.into_array(batch.num_rows())?; let when_value = as_boolean_array(&when_value).map_err(|e| { DataFusionError::Context( "WHEN expression did not return a BooleanArray".to_string(), @@ -214,7 +214,7 @@ impl CaseExpr { ColumnarValue::Scalar(value) if value.is_null() => { new_null_array(&return_type, batch.num_rows()) } - _ => then_value.into_array(batch.num_rows()), + _ => then_value.into_array(batch.num_rows())?, }; current_value = @@ -231,7 +231,7 @@ impl CaseExpr { .unwrap_or_else(|_| e.clone()); let else_ = expr .evaluate_selection(batch, &remainder)? - .into_array(batch.num_rows()); + .into_array(batch.num_rows())?; current_value = zip(&remainder, else_.as_ref(), current_value.as_ref())?; } @@ -425,7 +425,10 @@ mod tests { None, schema.as_ref(), )?; - let result = expr.evaluate(&batch)?.into_array(batch.num_rows()); + let result = expr + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); let result = as_int32_array(&result)?; let expected = &Int32Array::from(vec![Some(123), None, None, Some(456)]); @@ -453,7 +456,10 @@ mod tests { Some(else_value), schema.as_ref(), )?; - let result = expr.evaluate(&batch)?.into_array(batch.num_rows()); + let result = expr + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); let result = as_int32_array(&result)?; let expected = @@ -485,7 +491,10 @@ mod tests { Some(else_value), schema.as_ref(), )?; - let result = expr.evaluate(&batch)?.into_array(batch.num_rows()); + let result = expr + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); let result = as_float64_array(&result).expect("failed to downcast to Float64Array"); @@ -523,7 +532,10 @@ mod tests { None, schema.as_ref(), )?; - let result = expr.evaluate(&batch)?.into_array(batch.num_rows()); + let result = expr + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); let result = as_int32_array(&result)?; let expected = &Int32Array::from(vec![Some(123), None, None, Some(456)]); @@ -551,7 +563,10 @@ mod tests { Some(else_value), schema.as_ref(), )?; - let result = expr.evaluate(&batch)?.into_array(batch.num_rows()); + let result = expr + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); let result = as_int32_array(&result)?; let expected = @@ -583,7 +598,10 @@ mod tests { Some(x), schema.as_ref(), )?; - let result = expr.evaluate(&batch)?.into_array(batch.num_rows()); + let result = expr + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); let result = as_float64_array(&result).expect("failed to downcast to Float64Array"); @@ -629,7 +647,10 @@ mod tests { Some(else_value), schema.as_ref(), )?; - let result = expr.evaluate(&batch)?.into_array(batch.num_rows()); + let result = expr + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); let result = as_int32_array(&result)?; let expected = @@ -661,7 +682,10 @@ mod tests { Some(else_value), schema.as_ref(), )?; - let result = expr.evaluate(&batch)?.into_array(batch.num_rows()); + let result = expr + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); let result = as_float64_array(&result).expect("failed to downcast to Float64Array"); @@ -693,7 +717,10 @@ mod tests { None, schema.as_ref(), )?; - let result = expr.evaluate(&batch)?.into_array(batch.num_rows()); + let result = expr + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); let result = as_float64_array(&result).expect("failed to downcast to Float64Array"); @@ -721,7 +748,10 @@ mod tests { None, schema.as_ref(), )?; - let result = expr.evaluate(&batch)?.into_array(batch.num_rows()); + let result = expr + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); let result = as_float64_array(&result).expect("failed to downcast to Float64Array"); diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs index 5d56af364636..780e042156b8 100644 --- a/datafusion/physical-expr/src/expressions/cast.rs +++ b/datafusion/physical-expr/src/expressions/cast.rs @@ -178,7 +178,7 @@ pub fn cast_column( kernels::cast::cast_with_options(array, cast_type, &cast_options)?, )), ColumnarValue::Scalar(scalar) => { - let scalar_array = scalar.to_array(); + let scalar_array = scalar.to_array()?; let cast_array = kernels::cast::cast_with_options( &scalar_array, cast_type, @@ -263,7 +263,10 @@ mod tests { assert_eq!(expression.data_type(&schema)?, $TYPE); // compute - let result = expression.evaluate(&batch)?.into_array(batch.num_rows()); + let result = expression + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); // verify that the array's data_type is correct assert_eq!(*result.data_type(), $TYPE); @@ -312,7 +315,10 @@ mod tests { assert_eq!(expression.data_type(&schema)?, $TYPE); // compute - let result = expression.evaluate(&batch)?.into_array(batch.num_rows()); + let result = expression + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); // verify that the array's data_type is correct assert_eq!(*result.data_type(), $TYPE); diff --git a/datafusion/physical-expr/src/expressions/datum.rs b/datafusion/physical-expr/src/expressions/datum.rs index f57cbbd4ffa3..2bb79922cfec 100644 --- a/datafusion/physical-expr/src/expressions/datum.rs +++ b/datafusion/physical-expr/src/expressions/datum.rs @@ -34,14 +34,14 @@ pub(crate) fn apply( (ColumnarValue::Array(left), ColumnarValue::Array(right)) => { Ok(ColumnarValue::Array(f(&left.as_ref(), &right.as_ref())?)) } - (ColumnarValue::Scalar(left), ColumnarValue::Array(right)) => { - Ok(ColumnarValue::Array(f(&left.to_scalar(), &right.as_ref())?)) - } - (ColumnarValue::Array(left), ColumnarValue::Scalar(right)) => { - Ok(ColumnarValue::Array(f(&left.as_ref(), &right.to_scalar())?)) - } + (ColumnarValue::Scalar(left), ColumnarValue::Array(right)) => Ok( + ColumnarValue::Array(f(&left.to_scalar()?, &right.as_ref())?), + ), + (ColumnarValue::Array(left), ColumnarValue::Scalar(right)) => Ok( + ColumnarValue::Array(f(&left.as_ref(), &right.to_scalar()?)?), + ), (ColumnarValue::Scalar(left), ColumnarValue::Scalar(right)) => { - let array = f(&left.to_scalar(), &right.to_scalar())?; + let array = f(&left.to_scalar()?, &right.to_scalar()?)?; let scalar = ScalarValue::try_from_array(array.as_ref(), 0)?; Ok(ColumnarValue::Scalar(scalar)) } diff --git a/datafusion/physical-expr/src/expressions/get_indexed_field.rs b/datafusion/physical-expr/src/expressions/get_indexed_field.rs index df79e2835820..7d5f16c454d6 100644 --- a/datafusion/physical-expr/src/expressions/get_indexed_field.rs +++ b/datafusion/physical-expr/src/expressions/get_indexed_field.rs @@ -183,7 +183,7 @@ impl PhysicalExpr for GetIndexedFieldExpr { } fn evaluate(&self, batch: &RecordBatch) -> Result { - let array = self.arg.evaluate(batch)?.into_array(batch.num_rows()); + let array = self.arg.evaluate(batch)?.into_array(batch.num_rows())?; match &self.field { GetFieldAccessExpr::NamedStructField{name} => match (array.data_type(), name) { (DataType::Map(_, _), ScalarValue::Utf8(Some(k))) => { @@ -210,7 +210,7 @@ impl PhysicalExpr for GetIndexedFieldExpr { with utf8 indexes. Tried {dt:?} with {name:?} index"), }, GetFieldAccessExpr::ListIndex{key} => { - let key = key.evaluate(batch)?.into_array(batch.num_rows()); + let key = key.evaluate(batch)?.into_array(batch.num_rows())?; match (array.data_type(), key.data_type()) { (DataType::List(_), DataType::Int64) => Ok(ColumnarValue::Array(array_element(&[ array, key @@ -224,8 +224,8 @@ impl PhysicalExpr for GetIndexedFieldExpr { } }, GetFieldAccessExpr::ListRange{start, stop} => { - let start = start.evaluate(batch)?.into_array(batch.num_rows()); - let stop = stop.evaluate(batch)?.into_array(batch.num_rows()); + let start = start.evaluate(batch)?.into_array(batch.num_rows())?; + let stop = stop.evaluate(batch)?.into_array(batch.num_rows())?; match (array.data_type(), start.data_type(), stop.data_type()) { (DataType::List(_), DataType::Int64, DataType::Int64) => Ok(ColumnarValue::Array(array_slice(&[ array, start, stop @@ -326,7 +326,10 @@ mod tests { // only one row should be processed let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)])?; let expr = Arc::new(GetIndexedFieldExpr::new_field(expr, "a")); - let result = expr.evaluate(&batch)?.into_array(1); + let result = expr + .evaluate(&batch)? + .into_array(1) + .expect("Failed to convert to array"); let result = as_boolean_array(&result).expect("failed to downcast to BooleanArray"); assert_eq!(boolean, result.clone()); @@ -383,7 +386,10 @@ mod tests { vec![Arc::new(list_col), Arc::new(key_col)], )?; let expr = Arc::new(GetIndexedFieldExpr::new_index(expr, key)); - let result = expr.evaluate(&batch)?.into_array(1); + let result = expr + .evaluate(&batch)? + .into_array(1) + .expect("Failed to convert to array"); let result = as_string_array(&result).expect("failed to downcast to ListArray"); let expected = StringArray::from(expected_list); assert_eq!(expected, result.clone()); @@ -419,7 +425,10 @@ mod tests { vec![Arc::new(list_col), Arc::new(start_col), Arc::new(stop_col)], )?; let expr = Arc::new(GetIndexedFieldExpr::new_range(expr, start, stop)); - let result = expr.evaluate(&batch)?.into_array(1); + let result = expr + .evaluate(&batch)? + .into_array(1) + .expect("Failed to convert to array"); let result = as_list_array(&result).expect("failed to downcast to ListArray"); let (expected, _, _) = build_list_arguments(expected_list, vec![None], vec![None]); @@ -440,7 +449,10 @@ mod tests { vec![Arc::new(list_builder.finish()), key_array], )?; let expr = Arc::new(GetIndexedFieldExpr::new_index(expr, key)); - let result = expr.evaluate(&batch)?.into_array(batch.num_rows()); + let result = expr + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); assert!(result.is_null(0)); Ok(()) } @@ -461,7 +473,10 @@ mod tests { vec![Arc::new(list_builder.finish()), Arc::new(key_array)], )?; let expr = Arc::new(GetIndexedFieldExpr::new_index(expr, key)); - let result = expr.evaluate(&batch)?.into_array(1); + let result = expr + .evaluate(&batch)? + .into_array(1) + .expect("Failed to convert to array"); assert!(result.is_null(0)); Ok(()) } diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index 8d55fb70bd9e..625b01ec9a7e 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -351,15 +351,15 @@ impl PhysicalExpr for InListExpr { fn evaluate(&self, batch: &RecordBatch) -> Result { let value = self.expr.evaluate(batch)?; let r = match &self.static_filter { - Some(f) => f.contains(value.into_array(1).as_ref(), self.negated)?, + Some(f) => f.contains(value.into_array(1)?.as_ref(), self.negated)?, None => { - let value = value.into_array(batch.num_rows()); + let value = value.into_array(batch.num_rows())?; let found = self.list.iter().map(|expr| expr.evaluate(batch)).try_fold( BooleanArray::new(BooleanBuffer::new_unset(batch.num_rows()), None), |result, expr| -> Result { Ok(or_kleene( &result, - &eq(&value, &expr?.into_array(batch.num_rows()))?, + &eq(&value, &expr?.into_array(batch.num_rows())?)?, )?) }, )?; @@ -501,7 +501,10 @@ mod tests { ($BATCH:expr, $LIST:expr, $NEGATED:expr, $EXPECTED:expr, $COL:expr, $SCHEMA:expr) => {{ let (cast_expr, cast_list_exprs) = in_list_cast($COL, $LIST, $SCHEMA)?; let expr = in_list(cast_expr, cast_list_exprs, $NEGATED, $SCHEMA).unwrap(); - let result = expr.evaluate(&$BATCH)?.into_array($BATCH.num_rows()); + let result = expr + .evaluate(&$BATCH)? + .into_array($BATCH.num_rows()) + .expect("Failed to convert to array"); let result = as_boolean_array(&result).expect("failed to downcast to BooleanArray"); let expected = &BooleanArray::from($EXPECTED); diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr/src/expressions/is_not_null.rs index da717a517fb3..2e6a2bec9cab 100644 --- a/datafusion/physical-expr/src/expressions/is_not_null.rs +++ b/datafusion/physical-expr/src/expressions/is_not_null.rs @@ -132,7 +132,10 @@ mod tests { let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)])?; // expression: "a is not null" - let result = expr.evaluate(&batch)?.into_array(batch.num_rows()); + let result = expr + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); let result = as_boolean_array(&result).expect("failed to downcast to BooleanArray"); diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs index ee7897edd4de..3ad4058dd649 100644 --- a/datafusion/physical-expr/src/expressions/is_null.rs +++ b/datafusion/physical-expr/src/expressions/is_null.rs @@ -134,7 +134,10 @@ mod tests { let expr = is_null(col("a", &schema)?).unwrap(); let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)])?; - let result = expr.evaluate(&batch)?.into_array(batch.num_rows()); + let result = expr + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); let result = as_boolean_array(&result).expect("failed to downcast to BooleanArray"); diff --git a/datafusion/physical-expr/src/expressions/like.rs b/datafusion/physical-expr/src/expressions/like.rs index e833eabbfff2..37452e278484 100644 --- a/datafusion/physical-expr/src/expressions/like.rs +++ b/datafusion/physical-expr/src/expressions/like.rs @@ -201,7 +201,10 @@ mod test { )?; // compute - let result = expression.evaluate(&batch)?.into_array(batch.num_rows()); + let result = expression + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); let result = as_boolean_array(&result).expect("failed to downcast to BooleanArray"); let expected = &BooleanArray::from($VEC); diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs index 91cb23d5864e..cd3b51f09105 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -131,7 +131,10 @@ mod tests { let literal_expr = lit(42i32); assert_eq!("42", format!("{literal_expr}")); - let literal_array = literal_expr.evaluate(&batch)?.into_array(batch.num_rows()); + let literal_array = literal_expr + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); let literal_array = as_int32_array(&literal_array)?; // note that the contents of the literal array are unrelated to the batch contents except for the length of the array diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index c44b3cf01d36..1919cac97986 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -247,8 +247,10 @@ pub(crate) mod tests { let expr = agg.expressions(); let values = expr .iter() - .map(|e| e.evaluate(batch)) - .map(|r| r.map(|v| v.into_array(batch.num_rows()))) + .map(|e| { + e.evaluate(batch) + .and_then(|v| v.into_array(batch.num_rows())) + }) .collect::>>()?; accum.update_batch(&values)?; accum.evaluate() @@ -262,8 +264,10 @@ pub(crate) mod tests { let expr = agg.expressions(); let values = expr .iter() - .map(|e| e.evaluate(batch)) - .map(|r| r.map(|v| v.into_array(batch.num_rows()))) + .map(|e| { + e.evaluate(batch) + .and_then(|v| v.into_array(batch.num_rows())) + }) .collect::>>()?; let indices = vec![0; batch.num_rows()]; accum.update_batch(&values, &indices, None, 1)?; diff --git a/datafusion/physical-expr/src/expressions/negative.rs b/datafusion/physical-expr/src/expressions/negative.rs index 86b000e76a32..65b347941163 100644 --- a/datafusion/physical-expr/src/expressions/negative.rs +++ b/datafusion/physical-expr/src/expressions/negative.rs @@ -195,7 +195,7 @@ mod tests { let expected = &paste!{[<$DATA_TY Array>]::from(arr_expected)}; let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(input)])?; - let result = expr.evaluate(&batch)?.into_array(batch.num_rows()); + let result = expr.evaluate(&batch)?.into_array(batch.num_rows()).expect("Failed to convert to array"); let result = as_primitive_array(&result).expect(format!("failed to downcast to {:?}Array", $DATA_TY).as_str()); assert_eq!(result, expected); diff --git a/datafusion/physical-expr/src/expressions/not.rs b/datafusion/physical-expr/src/expressions/not.rs index c154fad10037..4ceccc6932fe 100644 --- a/datafusion/physical-expr/src/expressions/not.rs +++ b/datafusion/physical-expr/src/expressions/not.rs @@ -150,7 +150,10 @@ mod tests { let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(input)])?; - let result = expr.evaluate(&batch)?.into_array(batch.num_rows()); + let result = expr + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); let result = as_boolean_array(&result).expect("failed to downcast to BooleanArray"); assert_eq!(result, expected); diff --git a/datafusion/physical-expr/src/expressions/nullif.rs b/datafusion/physical-expr/src/expressions/nullif.rs index 7bbe9d73d435..252bd10c3e73 100644 --- a/datafusion/physical-expr/src/expressions/nullif.rs +++ b/datafusion/physical-expr/src/expressions/nullif.rs @@ -37,7 +37,7 @@ pub fn nullif_func(args: &[ColumnarValue]) -> Result { match (lhs, rhs) { (ColumnarValue::Array(lhs), ColumnarValue::Scalar(rhs)) => { - let rhs = rhs.to_scalar(); + let rhs = rhs.to_scalar()?; let array = nullif(lhs, &eq(&lhs, &rhs)?)?; Ok(ColumnarValue::Array(array)) @@ -47,7 +47,7 @@ pub fn nullif_func(args: &[ColumnarValue]) -> Result { Ok(ColumnarValue::Array(array)) } (ColumnarValue::Scalar(lhs), ColumnarValue::Array(rhs)) => { - let lhs = lhs.to_array_of_size(rhs.len()); + let lhs = lhs.to_array_of_size(rhs.len())?; let array = nullif(&lhs, &eq(&lhs, &rhs)?)?; Ok(ColumnarValue::Array(array)) } @@ -89,7 +89,7 @@ mod tests { let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32))); let result = nullif_func(&[a, lit_array])?; - let result = result.into_array(0); + let result = result.into_array(0).expect("Failed to convert to array"); let expected = Arc::new(Int32Array::from(vec![ Some(1), @@ -115,7 +115,7 @@ mod tests { let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(1i32))); let result = nullif_func(&[a, lit_array])?; - let result = result.into_array(0); + let result = result.into_array(0).expect("Failed to convert to array"); let expected = Arc::new(Int32Array::from(vec![ None, @@ -140,7 +140,7 @@ mod tests { let lit_array = ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))); let result = nullif_func(&[a, lit_array])?; - let result = result.into_array(0); + let result = result.into_array(0).expect("Failed to convert to array"); let expected = Arc::new(BooleanArray::from(vec![Some(true), None, None])) as ArrayRef; @@ -157,7 +157,7 @@ mod tests { let lit_array = ColumnarValue::Scalar(ScalarValue::Utf8(Some("bar".to_string()))); let result = nullif_func(&[a, lit_array])?; - let result = result.into_array(0); + let result = result.into_array(0).expect("Failed to convert to array"); let expected = Arc::new(StringArray::from(vec![ Some("foo"), @@ -178,7 +178,7 @@ mod tests { let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32))); let result = nullif_func(&[lit_array, a])?; - let result = result.into_array(0); + let result = result.into_array(0).expect("Failed to convert to array"); let expected = Arc::new(Int32Array::from(vec![ Some(2), @@ -198,7 +198,7 @@ mod tests { let b_eq = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32))); let result_eq = nullif_func(&[a_eq, b_eq])?; - let result_eq = result_eq.into_array(1); + let result_eq = result_eq.into_array(1).expect("Failed to convert to array"); let expected_eq = Arc::new(Int32Array::from(vec![None])) as ArrayRef; @@ -208,7 +208,9 @@ mod tests { let b_neq = ColumnarValue::Scalar(ScalarValue::Int32(Some(1i32))); let result_neq = nullif_func(&[a_neq, b_neq])?; - let result_neq = result_neq.into_array(1); + let result_neq = result_neq + .into_array(1) + .expect("Failed to convert to array"); let expected_neq = Arc::new(Int32Array::from(vec![Some(2i32)])) as ArrayRef; assert_eq!(expected_neq.as_ref(), result_neq.as_ref()); diff --git a/datafusion/physical-expr/src/expressions/try_cast.rs b/datafusion/physical-expr/src/expressions/try_cast.rs index cba026c56513..dea7f9f86a62 100644 --- a/datafusion/physical-expr/src/expressions/try_cast.rs +++ b/datafusion/physical-expr/src/expressions/try_cast.rs @@ -89,7 +89,7 @@ impl PhysicalExpr for TryCastExpr { Ok(ColumnarValue::Array(cast)) } ColumnarValue::Scalar(scalar) => { - let array = scalar.to_array(); + let array = scalar.to_array()?; let cast_array = cast_with_options(&array, &self.cast_type, &options)?; let cast_scalar = ScalarValue::try_from_array(&cast_array, 0)?; Ok(ColumnarValue::Scalar(cast_scalar)) @@ -187,7 +187,10 @@ mod tests { assert_eq!(expression.data_type(&schema)?, $TYPE); // compute - let result = expression.evaluate(&batch)?.into_array(batch.num_rows()); + let result = expression + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); // verify that the array's data_type is correct assert_eq!(*result.data_type(), $TYPE); @@ -235,7 +238,10 @@ mod tests { assert_eq!(expression.data_type(&schema)?, $TYPE); // compute - let result = expression.evaluate(&batch)?.into_array(batch.num_rows()); + let result = expression + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); // verify that the array's data_type is correct assert_eq!(*result.data_type(), $TYPE); diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index c973232c75a6..9185ade313eb 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -239,7 +239,7 @@ where }; arg.clone().into_array(expansion_len) }) - .collect::>(); + .collect::>>()?; let result = (inner)(&args); @@ -937,7 +937,7 @@ mod tests { match expected { Ok(expected) => { let result = expr.evaluate(&batch)?; - let result = result.into_array(batch.num_rows()); + let result = result.into_array(batch.num_rows()).expect("Failed to convert to array"); let result = result.as_any().downcast_ref::<$ARRAY_TYPE>().unwrap(); // value is correct @@ -2906,7 +2906,10 @@ mod tests { // evaluate works let batch = RecordBatch::try_new(Arc::new(schema.clone()), columns)?; - let result = expr.evaluate(&batch)?.into_array(batch.num_rows()); + let result = expr + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); // downcast works let result = as_list_array(&result)?; @@ -2945,7 +2948,10 @@ mod tests { // evaluate works let batch = RecordBatch::try_new(Arc::new(schema.clone()), columns)?; - let result = expr.evaluate(&batch)?.into_array(batch.num_rows()); + let result = expr + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); // downcast works let result = as_list_array(&result)?; @@ -3017,8 +3023,11 @@ mod tests { let adapter_func = make_scalar_function(dummy_function); let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1))); - let array_arg = - ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5)); + let array_arg = ColumnarValue::Array( + ScalarValue::Int64(Some(1)) + .to_array_of_size(5) + .expect("Failed to convert to array of size"), + ); let result = unpack_uint64_array(adapter_func(&[array_arg, scalar_arg]))?; assert_eq!(result, vec![5, 5]); @@ -3030,8 +3039,11 @@ mod tests { let adapter_func = make_scalar_function_with_hints(dummy_function, vec![]); let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1))); - let array_arg = - ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5)); + let array_arg = ColumnarValue::Array( + ScalarValue::Int64(Some(1)) + .to_array_of_size(5) + .expect("Failed to convert to array of size"), + ); let result = unpack_uint64_array(adapter_func(&[array_arg, scalar_arg]))?; assert_eq!(result, vec![5, 5]); @@ -3046,8 +3058,11 @@ mod tests { ); let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1))); - let array_arg = - ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5)); + let array_arg = ColumnarValue::Array( + ScalarValue::Int64(Some(1)) + .to_array_of_size(5) + .expect("Failed to convert to array of size"), + ); let result = unpack_uint64_array(adapter_func(&[array_arg, scalar_arg]))?; assert_eq!(result, vec![5, 1]); @@ -3056,8 +3071,11 @@ mod tests { #[test] fn test_make_scalar_function_with_hints_on_arrays() -> Result<()> { - let array_arg = - ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5)); + let array_arg = ColumnarValue::Array( + ScalarValue::Int64(Some(1)) + .to_array_of_size(5) + .expect("Failed to convert to array of size"), + ); let adapter_func = make_scalar_function_with_hints( dummy_function, vec![Hint::Pad, Hint::AcceptsSingular], @@ -3077,8 +3095,11 @@ mod tests { ); let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1))); - let array_arg = - ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5)); + let array_arg = ColumnarValue::Array( + ScalarValue::Int64(Some(1)) + .to_array_of_size(5) + .expect("Failed to convert to array of size"), + ); let result = unpack_uint64_array(adapter_func(&[ array_arg, scalar_arg.clone(), @@ -3097,8 +3118,11 @@ mod tests { ); let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1))); - let array_arg = - ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5)); + let array_arg = ColumnarValue::Array( + ScalarValue::Int64(Some(1)) + .to_array_of_size(5) + .expect("Failed to convert to array of size"), + ); let result = unpack_uint64_array(adapter_func(&[ array_arg.clone(), scalar_arg.clone(), @@ -3125,8 +3149,11 @@ mod tests { ); let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1))); - let array_arg = - ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5)); + let array_arg = ColumnarValue::Array( + ScalarValue::Int64(Some(1)) + .to_array_of_size(5) + .expect("Failed to convert to array of size"), + ); let result = unpack_uint64_array(adapter_func(&[array_arg, scalar_arg]))?; assert_eq!(result, vec![5, 1]); diff --git a/datafusion/physical-expr/src/intervals/interval_aritmetic.rs b/datafusion/physical-expr/src/intervals/interval_aritmetic.rs index 1ea9b2d9aee6..4b81adfbb1f8 100644 --- a/datafusion/physical-expr/src/intervals/interval_aritmetic.rs +++ b/datafusion/physical-expr/src/intervals/interval_aritmetic.rs @@ -750,7 +750,7 @@ fn cast_scalar_value( data_type: &DataType, cast_options: &CastOptions, ) -> Result { - let cast_array = cast_with_options(&value.to_array(), data_type, cast_options)?; + let cast_array = cast_with_options(&value.to_array()?, data_type, cast_options)?; ScalarValue::try_from_array(&cast_array, 0) } diff --git a/datafusion/physical-expr/src/math_expressions.rs b/datafusion/physical-expr/src/math_expressions.rs index 0b7bc34014f9..af66862aecc5 100644 --- a/datafusion/physical-expr/src/math_expressions.rs +++ b/datafusion/physical-expr/src/math_expressions.rs @@ -769,7 +769,8 @@ mod tests { let args = vec![ColumnarValue::Array(Arc::new(NullArray::new(1)))]; let array = random(&args) .expect("failed to initialize function random") - .into_array(1); + .into_array(1) + .expect("Failed to convert to array"); let floats = as_float64_array(&array).expect("failed to initialize function random"); diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index 64c1d0be0455..f318cd3b0f4d 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -472,7 +472,7 @@ mod tests { ]))], )?; let result = p.evaluate(&batch)?; - let result = result.into_array(4); + let result = result.into_array(4).expect("Failed to convert to array"); assert_eq!( &result, diff --git a/datafusion/physical-expr/src/struct_expressions.rs b/datafusion/physical-expr/src/struct_expressions.rs index baa29d668e90..0eed1d16fba8 100644 --- a/datafusion/physical-expr/src/struct_expressions.rs +++ b/datafusion/physical-expr/src/struct_expressions.rs @@ -67,13 +67,15 @@ fn array_struct(args: &[ArrayRef]) -> Result { /// put values in a struct array. pub fn struct_expr(values: &[ColumnarValue]) -> Result { - let arrays: Vec = values + let arrays = values .iter() - .map(|x| match x { - ColumnarValue::Array(array) => array.clone(), - ColumnarValue::Scalar(scalar) => scalar.to_array().clone(), + .map(|x| { + Ok(match x { + ColumnarValue::Array(array) => array.clone(), + ColumnarValue::Scalar(scalar) => scalar.to_array()?.clone(), + }) }) - .collect(); + .collect::>>()?; Ok(ColumnarValue::Array(array_struct(arrays.as_slice())?)) } @@ -93,7 +95,8 @@ mod tests { ]; let struc = struct_expr(&args) .expect("failed to initialize function struct") - .into_array(1); + .into_array(1) + .expect("Failed to convert to array"); let result = as_struct_array(&struc).expect("failed to initialize function struct"); assert_eq!( diff --git a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs index 66ffa990b78b..7aa4f6536a6e 100644 --- a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs +++ b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs @@ -60,8 +60,10 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug { fn evaluate_args(&self, batch: &RecordBatch) -> Result> { self.expressions() .iter() - .map(|e| e.evaluate(batch)) - .map(|r| r.map(|v| v.into_array(batch.num_rows()))) + .map(|e| { + e.evaluate(batch) + .and_then(|v| v.into_array(batch.num_rows())) + }) .collect() } diff --git a/datafusion/physical-expr/src/window/lead_lag.rs b/datafusion/physical-expr/src/window/lead_lag.rs index f55f1600b9ca..d22660d41ebd 100644 --- a/datafusion/physical-expr/src/window/lead_lag.rs +++ b/datafusion/physical-expr/src/window/lead_lag.rs @@ -139,6 +139,7 @@ fn create_empty_array( let array = value .as_ref() .map(|scalar| scalar.to_array_of_size(size)) + .transpose()? .unwrap_or_else(|| new_null_array(data_type, size)); if array.data_type() != data_type { cast(&array, data_type).map_err(DataFusionError::ArrowError) diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs index 9b0a02d329c4..b282e3579754 100644 --- a/datafusion/physical-expr/src/window/window_expr.rs +++ b/datafusion/physical-expr/src/window/window_expr.rs @@ -82,8 +82,10 @@ pub trait WindowExpr: Send + Sync + Debug { fn evaluate_args(&self, batch: &RecordBatch) -> Result> { self.expressions() .iter() - .map(|e| e.evaluate(batch)) - .map(|r| r.map(|v| v.into_array(batch.num_rows()))) + .map(|e| { + e.evaluate(batch) + .and_then(|v| v.into_array(batch.num_rows())) + }) .collect() } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 4052d6aef0ae..3ac812929772 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1064,10 +1064,11 @@ fn finalize_aggregation( // build the vector of states let a = accumulators .iter() - .map(|accumulator| accumulator.state()) - .map(|value| { - value.map(|e| { - e.iter().map(|v| v.to_array()).collect::>() + .map(|accumulator| { + accumulator.state().and_then(|e| { + e.iter() + .map(|v| v.to_array()) + .collect::>>() }) }) .collect::>>()?; @@ -1080,7 +1081,7 @@ fn finalize_aggregation( // merge the state to the final value accumulators .iter() - .map(|accumulator| accumulator.evaluate().map(|v| v.to_array())) + .map(|accumulator| accumulator.evaluate().and_then(|v| v.to_array())) .collect::>>() } } @@ -1092,9 +1093,11 @@ fn evaluate( batch: &RecordBatch, ) -> Result> { expr.iter() - .map(|expr| expr.evaluate(batch)) - .map(|r| r.map(|v| v.into_array(batch.num_rows()))) - .collect::>>() + .map(|expr| { + expr.evaluate(batch) + .and_then(|v| v.into_array(batch.num_rows())) + }) + .collect() } /// Evaluates expressions against a record batch. @@ -1114,9 +1117,11 @@ fn evaluate_optional( expr.iter() .map(|expr| { expr.as_ref() - .map(|expr| expr.evaluate(batch)) + .map(|expr| { + expr.evaluate(batch) + .and_then(|v| v.into_array(batch.num_rows())) + }) .transpose() - .map(|r| r.map(|v| v.into_array(batch.num_rows()))) }) .collect::>>() } @@ -1140,7 +1145,7 @@ pub(crate) fn evaluate_group_by( .iter() .map(|(expr, _)| { let value = expr.evaluate(batch)?; - Ok(value.into_array(batch.num_rows())) + value.into_array(batch.num_rows()) }) .collect::>>()?; @@ -1149,7 +1154,7 @@ pub(crate) fn evaluate_group_by( .iter() .map(|(expr, _)| { let value = expr.evaluate(batch)?; - Ok(value.into_array(batch.num_rows())) + value.into_array(batch.num_rows()) }) .collect::>>()?; diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs index 32c0bbc78a5d..90eb488a2ead 100644 --- a/datafusion/physical-plan/src/aggregates/no_grouping.rs +++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs @@ -217,8 +217,10 @@ fn aggregate_batch( // 1.3 let values = &expr .iter() - .map(|e| e.evaluate(&batch)) - .map(|r| r.map(|v| v.into_array(batch.num_rows()))) + .map(|e| { + e.evaluate(&batch) + .and_then(|v| v.into_array(batch.num_rows())) + }) .collect::>>()?; // 1.4 diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 0c44b367e514..d560a219f230 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -300,7 +300,7 @@ pub(crate) fn batch_filter( ) -> Result { predicate .evaluate(batch) - .map(|v| v.into_array(batch.num_rows())) + .and_then(|v| v.into_array(batch.num_rows())) .and_then(|array| { Ok(as_boolean_array(&array)?) // apply filter array to record batch diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 102f0c42e90c..4c928d44caf4 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -344,7 +344,7 @@ fn build_batch( .iter() .map(|arr| { let scalar = ScalarValue::try_from_array(arr, left_index)?; - Ok(scalar.to_array_of_size(batch.num_rows())) + scalar.to_array_of_size(batch.num_rows()) }) .collect::>>()?; diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 1a2db87d98a2..546a929bf939 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -713,7 +713,7 @@ where // evaluate the keys let keys_values = on .iter() - .map(|c| Ok(c.evaluate(batch)?.into_array(batch.num_rows()))) + .map(|c| c.evaluate(batch)?.into_array(batch.num_rows())) .collect::>>()?; // calculate the hash values @@ -857,13 +857,13 @@ pub fn build_equal_condition_join_indices( ) -> Result<(UInt64Array, UInt32Array)> { let keys_values = probe_on .iter() - .map(|c| Ok(c.evaluate(probe_batch)?.into_array(probe_batch.num_rows()))) + .map(|c| c.evaluate(probe_batch)?.into_array(probe_batch.num_rows())) .collect::>>()?; let build_join_values = build_on .iter() .map(|c| { - Ok(c.evaluate(build_input_buffer)? - .into_array(build_input_buffer.num_rows())) + c.evaluate(build_input_buffer)? + .into_array(build_input_buffer.num_rows()) }) .collect::>>()?; hashes_buffer.clear(); diff --git a/datafusion/physical-plan/src/joins/hash_join_utils.rs b/datafusion/physical-plan/src/joins/hash_join_utils.rs index c134b23d78cf..5ebf370b6d71 100644 --- a/datafusion/physical-plan/src/joins/hash_join_utils.rs +++ b/datafusion/physical-plan/src/joins/hash_join_utils.rs @@ -607,7 +607,7 @@ pub fn update_filter_expr_interval( .origin_sorted_expr() .expr .evaluate(batch)? - .into_array(1); + .into_array(1)?; // Convert the array to a ScalarValue: let value = ScalarValue::try_from_array(&array, 0)?; // Create a ScalarValue representing positive or negative infinity for the same data type: diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 1306a4874436..51561f5dab24 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -626,7 +626,9 @@ impl Stream for SymmetricHashJoinStream { /// # Returns /// /// A [Result] object that contains the pruning length. The function will return -/// an error if there is an issue evaluating the build side filter expression. +/// an error if +/// - there is an issue evaluating the build side filter expression; +/// - there is an issue converting the build side filter expression into an array fn determine_prune_length( buffer: &RecordBatch, build_side_filter_expr: &SortedFilterExpr, @@ -637,7 +639,7 @@ fn determine_prune_length( let batch_arr = origin_sorted_expr .expr .evaluate(buffer)? - .into_array(buffer.num_rows()); + .into_array(buffer.num_rows())?; // Get the lower or upper interval based on the sort direction let target = if origin_sorted_expr.options.descending { diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 5efeedfe6536..f93f08255e0c 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -778,7 +778,7 @@ pub(crate) fn apply_join_filter_to_indices( let filter_result = filter .expression() .evaluate(&intermediate_batch)? - .into_array(intermediate_batch.num_rows()); + .into_array(intermediate_batch.num_rows())?; let mask = as_boolean_array(&filter_result)?; let left_filtered = compute::filter(&build_indices, mask)?; diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index bbf0d6d4b31c..b8e2d0e425d4 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -310,8 +310,10 @@ impl ProjectionStream { let arrays = self .expr .iter() - .map(|expr| expr.evaluate(batch)) - .map(|r| r.map(|v| v.into_array(batch.num_rows()))) + .map(|expr| { + expr.evaluate(batch) + .and_then(|v| v.into_array(batch.num_rows())) + }) .collect::>>()?; if arrays.is_empty() { diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 66f7037e5c2d..9836e057ff87 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -169,9 +169,7 @@ impl BatchPartitioner { let arrays = exprs .iter() - .map(|expr| { - Ok(expr.evaluate(&batch)?.into_array(batch.num_rows())) - }) + .map(|expr| expr.evaluate(&batch)?.into_array(batch.num_rows())) .collect::>>()?; hash_buffer.clear(); diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index 4cabdc6e178c..135b4fbdece4 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -118,7 +118,7 @@ impl RowCursorStream { let cols = self .column_expressions .iter() - .map(|expr| Ok(expr.evaluate(batch)?.into_array(batch.num_rows()))) + .map(|expr| expr.evaluate(batch)?.into_array(batch.num_rows())) .collect::>>()?; let rows = self.converter.convert_columns(&cols)?; @@ -181,7 +181,7 @@ impl FieldCursorStream { fn convert_batch(&mut self, batch: &RecordBatch) -> Result> { let value = self.sort.expr.evaluate(batch)?; - let array = value.into_array(batch.num_rows()); + let array = value.into_array(batch.num_rows())?; let array = array.as_any().downcast_ref::().expect("field values"); Ok(ArrayValues::new(self.sort.options, array)) } diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 4638c0dcf264..9120566273d3 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -153,7 +153,7 @@ impl TopK { .iter() .map(|expr| { let value = expr.expr.evaluate(&batch)?; - Ok(value.into_array(batch.num_rows())) + value.into_array(batch.num_rows()) }) .collect::>>()?; diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index c9f3fb76c2e5..af4a81626cd7 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -242,7 +242,7 @@ fn build_batch( column: &Column, options: &UnnestOptions, ) -> Result { - let list_array = column.evaluate(batch)?.into_array(batch.num_rows()); + let list_array = column.evaluate(batch)?.into_array(batch.num_rows())?; match list_array.data_type() { DataType::List(_) => { let list_array = list_array.as_any().downcast_ref::().unwrap();