From 6857d6df02d71a8ab100e4d9ddc0b44b42a7908a Mon Sep 17 00:00:00 2001 From: Igor Izvekov Date: Sun, 9 Jul 2023 19:23:26 +0300 Subject: [PATCH] feat: column support for `array_dims`, `array_ndims`, `cardinality` and `array_length` (#6864) * feat: column support for array_dims, array_ndims, cardinality and array_length * feat: sqllogictests for array_dims, array_ndims, array_length and cardinality * fix: merge main --- .../tests/sqllogictests/test_files/array.slt | 78 +++- datafusion/expr/src/built_in_function.rs | 8 +- .../physical-expr/src/array_expressions.rs | 423 +++++++----------- datafusion/physical-expr/src/functions.rs | 8 +- 4 files changed, 254 insertions(+), 263 deletions(-) diff --git a/datafusion/core/tests/sqllogictests/test_files/array.slt b/datafusion/core/tests/sqllogictests/test_files/array.slt index 2d7a6099893c..b46875725fc8 100644 --- a/datafusion/core/tests/sqllogictests/test_files/array.slt +++ b/datafusion/core/tests/sqllogictests/test_files/array.slt @@ -505,7 +505,19 @@ select cardinality(make_array([1, 2], [3, 4], [5, 6])), cardinality(array_fill(3 query II select cardinality(make_array()), cardinality(make_array(make_array())) ---- -0 0 +NULL 0 + +# cardinality with columns +query III +select cardinality(column1), cardinality(column2), cardinality(column3) from arrays; +---- +4 3 5 +4 3 5 +4 3 5 +4 3 3 +NULL 3 4 +4 NULL 1 +4 3 NULL ## trim_array @@ -565,13 +577,39 @@ select array_length(make_array()), array_length(make_array(), 1), array_length(m ---- 0 0 NULL +# array_length with columns +query I +select array_length(column1, column3) from arrays_values; +---- +10 +NULL +NULL +NULL +NULL +NULL +NULL +NULL + +# array_length with columns and scalars +query II +select array_length(array[array[1, 2], array[3, 4]], column3), array_length(column1, 1) from arrays_values; +---- +2 10 +2 10 +NULL 10 +NULL 10 +NULL NULL +NULL 10 +NULL 10 +NULL 10 + ## array_dims # array_dims scalar function -query error DataFusion error: SQL error: ParserError\("Expected an SQL statement, found: caused"\) -caused by -Error during planning: Cannot automatically convert List\(Field \{ name: "item", data_type: UInt8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: \{\} \}\) to UInt8 +query ??? select array_dims(make_array(1, 2, 3)), array_dims(make_array([1, 2], [3, 4])), array_dims(make_array([[[[1], [2]]]])); +---- +[3] [2, 2] [1, 1, 1, 2, 1] # array_dims scalar function #2 query error DataFusion error: SQL error: ParserError\("Expected an SQL statement, found: caused"\) @@ -580,10 +618,22 @@ Error during planning: Cannot automatically convert List\(Field \{ name: "item", select array_dims(array_fill(2, [1, 2, 3])), array_dims(array_fill(3, [2, 5, 4])); # array_dims scalar function #3 -query error DataFusion error: SQL error: ParserError\("Expected an SQL statement, found: caused"\) -caused by -Error during planning: Cannot automatically convert List\(Field \{ name: "item", data_type: UInt8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: \{\} \}\) to UInt8 +query ?? select array_dims(make_array()), array_dims(make_array(make_array())) +---- +NULL [1, 0] + +# array_dims with columns +query ??? +select array_dims(column1), array_dims(column2), array_dims(column3) from arrays; +---- +[2, 2] [3] [5] +[2, 2] [3] [5] +[2, 2] [3] [5] +[2, 2] [3] [3] +NULL [3] [4] +[2, 2] NULL [1] +[2, 2] [3] NULL ## array_ndims @@ -603,7 +653,19 @@ select array_ndims(array_fill(1, [1, 2, 3])), array_ndims([[[[[[[[[[[[[[[[[[[[[1 query II select array_ndims(make_array()), array_ndims(make_array(make_array())) ---- -1 2 +NULL 2 + +# array_ndims with columns +query III +select array_ndims(column1), array_ndims(column2), array_ndims(column3) from arrays; +---- +2 1 1 +2 1 1 +2 1 1 +2 1 1 +NULL 1 1 +2 NULL 1 +2 1 NULL ## array_contains diff --git a/datafusion/expr/src/built_in_function.rs b/datafusion/expr/src/built_in_function.rs index 103f1047fa48..9acb82d47b05 100644 --- a/datafusion/expr/src/built_in_function.rs +++ b/datafusion/expr/src/built_in_function.rs @@ -480,14 +480,16 @@ impl BuiltinScalarFunction { Ok(List(Arc::new(Field::new("item", expr_type, true)))) } BuiltinScalarFunction::ArrayContains => Ok(Boolean), - BuiltinScalarFunction::ArrayDims => Ok(UInt8), + BuiltinScalarFunction::ArrayDims => { + Ok(List(Arc::new(Field::new("item", UInt64, true)))) + } BuiltinScalarFunction::ArrayFill => Ok(List(Arc::new(Field::new( "item", input_expr_types[1].clone(), true, )))), - BuiltinScalarFunction::ArrayLength => Ok(UInt8), - BuiltinScalarFunction::ArrayNdims => Ok(UInt8), + BuiltinScalarFunction::ArrayLength => Ok(UInt64), + BuiltinScalarFunction::ArrayNdims => Ok(UInt64), BuiltinScalarFunction::ArrayPosition => Ok(UInt64), BuiltinScalarFunction::ArrayPositions => { Ok(List(Arc::new(Field::new("item", UInt64, true)))) diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index cbf5896b85f4..6af87b506ea8 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -20,7 +20,7 @@ use arrow::array::*; use arrow::buffer::{Buffer, OffsetBuffer}; use arrow::compute; -use arrow::datatypes::{DataType, Field}; +use arrow::datatypes::{DataType, Field, UInt64Type}; use core::any::type_name; use datafusion_common::cast::{as_generic_string_array, as_int64_array, as_list_array}; use datafusion_common::ScalarValue; @@ -99,6 +99,86 @@ macro_rules! array { }}; } +/// Returns the length of a concrete array dimension +fn compute_array_length( + arr: Option, + dimension: Option, +) -> Result> { + let mut current_dimension: i64 = 1; + let mut value = match arr { + Some(arr) => arr, + None => return Ok(None), + }; + let dimension = match dimension { + Some(value) => { + if value < 1 { + return Ok(None); + } + + value + } + None => return Ok(None), + }; + + loop { + if current_dimension == dimension { + return Ok(Some(value.len() as u64)); + } + + match value.data_type() { + DataType::List(..) => { + value = downcast_arg!(value, ListArray).value(0); + current_dimension += 1; + } + _ => return Ok(None), + } + } +} + +/// Returns the dimension of the array +fn compute_array_ndims(arr: Option) -> Result> { + let mut res: u64 = 1; + let mut value = match arr { + Some(arr) => arr, + None => return Ok(None), + }; + if value.is_empty() { + return Ok(None); + } + + loop { + match value.data_type() { + DataType::List(..) => { + value = downcast_arg!(value, ListArray).value(0); + res += 1; + } + _ => return Ok(Some(res)), + } + } +} + +/// Returns the length of each array dimension +fn compute_array_dims(arr: Option) -> Result>>> { + let mut value = match arr { + Some(arr) => arr, + None => return Ok(None), + }; + if value.is_empty() { + return Ok(None); + } + let mut res = vec![Some(value.len() as u64)]; + + loop { + match value.data_type() { + DataType::List(..) => { + value = downcast_arg!(value, ListArray).value(0); + res.push(Some(value.len() as u64)); + } + _ => return Ok(Some(res)), + } + } +} + fn array_array(args: &[ArrayRef], data_type: DataType) -> Result { // do not accept 0 arguments. if args.is_empty() { @@ -373,47 +453,22 @@ pub fn array_prepend(args: &[ArrayRef]) -> Result { Ok(res) } -fn compute_array_ndims(arg: u8, arr: ArrayRef) -> Result { - match arr.data_type() { - DataType::List(..) => { - let list_array = downcast_arg!(arr, ListArray); - compute_array_ndims(arg + 1, list_array.value(0)) - } - DataType::Null - | DataType::Utf8 - | DataType::LargeUtf8 - | DataType::Boolean - | DataType::Float32 - | DataType::Float64 - | DataType::Int8 - | DataType::Int16 - | DataType::Int32 - | DataType::Int64 - | DataType::UInt8 - | DataType::UInt16 - | DataType::UInt32 - | DataType::UInt64 => Ok(arg), - data_type => Err(DataFusionError::NotImplemented(format!( - "Array is not implemented for type '{data_type:?}'." - ))), - } -} - fn align_array_dimensions(args: Vec) -> Result> { // Find the maximum number of dimensions - let max_ndim: u8 = *args + let max_ndim: u64 = (*args .iter() - .map(|arr| compute_array_ndims(0, arr.clone())) - .collect::>>()? + .map(|arr| compute_array_ndims(Some(arr.clone()))) + .collect::>>>()? .iter() .max() - .unwrap(); + .unwrap()) + .unwrap(); // Align the dimensions of the arrays let aligned_args: Result> = args .into_iter() .map(|array| { - let ndim = compute_array_ndims(0, array.clone())?; + let ndim = compute_array_ndims(Some(array.clone()))?.unwrap(); if ndim < max_ndim { let mut aligned_array = array.clone(); for _ in 0..(max_ndim - ndim) { @@ -605,10 +660,11 @@ pub fn array_position(args: &[ArrayRef]) -> Result { let arr = as_list_array(&args[0])?; let element = &args[1]; - let mut index = Int64Array::from_value(0, arr.len()); - if args.len() == 3 { - index = as_int64_array(&args[2])?.clone(); - } + let index = if args.len() == 3 { + as_int64_array(&args[2])?.clone() + } else { + Int64Array::from_value(0, arr.len()) + }; let res = match arr.data_type() { DataType::List(field) => match field.data_type() { @@ -1107,184 +1163,61 @@ pub fn trim_array(args: &[ArrayRef]) -> Result { } /// Cardinality SQL function -pub fn cardinality(args: &[ColumnarValue]) -> Result { - let arr = match &args[0] { - ColumnarValue::Scalar(scalar) => scalar.to_array().clone(), - ColumnarValue::Array(arr) => arr.clone(), - }; +pub fn cardinality(args: &[ArrayRef]) -> Result { + let list_array = as_list_array(&args[0])?.clone(); - fn compute_cardinality(arg: &mut u64, arr: ArrayRef) -> Result<&mut u64> { - match arr.data_type() { - DataType::List(..) => { - let list_array = downcast_arg!(arr, ListArray); - for i in 0..list_array.len() { - compute_cardinality(arg, list_array.value(i))?; - } + let result = list_array + .iter() + .map(|arr| match compute_array_dims(arr)? { + Some(vector) => Ok(Some(vector.iter().map(|x| x.unwrap()).product::())), + None => Ok(None), + }) + .collect::>()?; - Ok(arg) - } - DataType::Null - | DataType::Utf8 - | DataType::LargeUtf8 - | DataType::Boolean - | DataType::Float32 - | DataType::Float64 - | DataType::Int8 - | DataType::Int16 - | DataType::Int32 - | DataType::Int64 - | DataType::UInt8 - | DataType::UInt16 - | DataType::UInt32 - | DataType::UInt64 => { - *arg += arr.len() as u64; - Ok(arg) - } - data_type => Err(DataFusionError::NotImplemented(format!( - "Array is not implemented for type '{data_type:?}'." - ))), - } - } - let mut arg: u64 = 0; - Ok(ColumnarValue::Array(Arc::new(UInt64Array::from(vec![ - *compute_cardinality(&mut arg, arr)?, - ])))) + Ok(Arc::new(result) as ArrayRef) } /// Array_length SQL function -pub fn array_length(args: &[ColumnarValue]) -> Result { - let arr = match &args[0] { - ColumnarValue::Scalar(scalar) => scalar.to_array().clone(), - ColumnarValue::Array(arr) => arr.clone(), +pub fn array_length(args: &[ArrayRef]) -> Result { + let list_array = as_list_array(&args[0])?; + let dimension = if args.len() == 2 { + as_int64_array(&args[1])?.clone() + } else { + Int64Array::from_value(1, list_array.len()) }; - let mut element: u8 = 1; - if args.len() == 2 { - let scalar = match &args[1] { - ColumnarValue::Scalar(scalar) => scalar.clone(), - _ => { - return Err(DataFusionError::Internal( - "Array_length function requires positive integer scalar element" - .to_string(), - )) - } - }; - - element = match scalar { - ScalarValue::Int8(Some(value)) => value as u8, - ScalarValue::Int16(Some(value)) => value as u8, - ScalarValue::Int32(Some(value)) => value as u8, - ScalarValue::Int64(Some(value)) => value as u8, - ScalarValue::UInt8(Some(value)) => value, - ScalarValue::UInt16(Some(value)) => value as u8, - ScalarValue::UInt32(Some(value)) => value as u8, - ScalarValue::UInt64(Some(value)) => value as u8, - _ => { - return Err(DataFusionError::Internal( - "Array_length function requires positive integer scalar element" - .to_string(), - )) - } - }; - if element == 0 { - return Err(DataFusionError::Internal( - "Array_length function requires positive integer scalar element" - .to_string(), - )); - } - } + let result = list_array + .iter() + .zip(dimension.iter()) + .map(|(arr, dim)| compute_array_length(arr, dim)) + .collect::>()?; - fn compute_array_length(arg: u8, array: ArrayRef, element: u8) -> Result> { - match array.data_type() { - DataType::List(..) => { - let list_array = downcast_arg!(array, ListArray); - if arg == element + 1 { - Ok(Some(list_array.len() as u8)) - } else { - compute_array_length(arg + 1, list_array.value(0), element) - } - } - DataType::Null - | DataType::Utf8 - | DataType::LargeUtf8 - | DataType::Boolean - | DataType::Float32 - | DataType::Float64 - | DataType::Int8 - | DataType::Int16 - | DataType::Int32 - | DataType::Int64 - | DataType::UInt8 - | DataType::UInt16 - | DataType::UInt32 - | DataType::UInt64 => { - if arg == element + 1 { - Ok(Some(array.len() as u8)) - } else { - Ok(None) - } - } - data_type => Err(DataFusionError::NotImplemented(format!( - "Array is not implemented for type '{data_type:?}'." - ))), - } - } - let arg: u8 = 1; - Ok(ColumnarValue::Array(Arc::new(UInt8Array::from(vec![ - compute_array_length(arg, arr, element)?, - ])))) + Ok(Arc::new(result) as ArrayRef) } /// Array_dims SQL function pub fn array_dims(args: &[ArrayRef]) -> Result { - fn compute_array_dims( - arg: &mut Vec, - arr: ArrayRef, - ) -> Result<&mut Vec> { - match arr.data_type() { - DataType::List(..) => { - let list_array = downcast_arg!(arr, ListArray).value(0); - arg.push(ColumnarValue::Scalar(ScalarValue::UInt8(Some( - list_array.len() as u8, - )))); - return compute_array_dims(arg, list_array); - } - DataType::Null - | DataType::Utf8 - | DataType::LargeUtf8 - | DataType::Boolean - | DataType::Float32 - | DataType::Float64 - | DataType::Int8 - | DataType::Int16 - | DataType::Int32 - | DataType::Int64 - | DataType::UInt8 - | DataType::UInt16 - | DataType::UInt32 - | DataType::UInt64 => Ok(arg), - data_type => Err(DataFusionError::NotImplemented(format!( - "Array is not implemented for type '{data_type:?}'." - ))), - } - } + let list_array = as_list_array(&args[0])?; - let mut arg: Vec = vec![]; - Ok(array( - compute_array_dims(&mut arg, args[0].clone())? - .clone() - .as_slice(), - )? - .into_array(1)) + let data = list_array + .iter() + .map(compute_array_dims) + .collect::>>()?; + let result = ListArray::from_iter_primitive::(data); + + Ok(Arc::new(result) as ArrayRef) } /// Array_ndims SQL function pub fn array_ndims(args: &[ArrayRef]) -> Result { - let arg: u8 = 0; - Ok(Arc::new(UInt8Array::from(vec![compute_array_ndims( - arg, - args[0].clone(), - )?]))) + let list_array = as_list_array(&args[0])?; + + let result = list_array + .iter() + .map(compute_array_ndims) + .collect::>()?; + + Ok(Arc::new(result) as ArrayRef) } macro_rules! contains { @@ -1354,10 +1287,9 @@ pub fn array_contains(args: &[ArrayRef]) -> Result { #[cfg(test)] mod tests { use super::*; - use arrow::array::UInt8Array; use arrow::datatypes::Int64Type; use datafusion_common::cast::{ - as_generic_string_array, as_list_array, as_uint64_array, as_uint8_array, + as_generic_string_array, as_list_array, as_uint64_array, }; use datafusion_common::scalar::ScalarValue; @@ -1792,10 +1724,9 @@ mod tests { #[test] fn test_cardinality() { // cardinality([1, 2, 3, 4]) = 4 - let list_array = return_array(); + let list_array = return_array().into_array(1); let arr = cardinality(&[list_array]) - .expect("failed to initialize function cardinality") - .into_array(1); + .expect("failed to initialize function cardinality"); let result = as_uint64_array(&arr).expect("failed to initialize function cardinality"); @@ -1805,10 +1736,9 @@ mod tests { #[test] fn test_nested_cardinality() { // cardinality([[1, 2, 3, 4], [5, 6, 7, 8]]) = 8 - let list_array = return_nested_array(); + let list_array = return_nested_array().into_array(1); let arr = cardinality(&[list_array]) - .expect("failed to initialize function cardinality") - .into_array(1); + .expect("failed to initialize function cardinality"); let result = as_uint64_array(&arr).expect("failed to initialize function cardinality"); @@ -1818,67 +1748,60 @@ mod tests { #[test] fn test_array_length() { // array_length([1, 2, 3, 4]) = 4 - let list_array = return_array(); - let array = array_length(&[list_array.clone()]) - .expect("failed to initialize function array_ndims") - .into_array(1); + let list_array = return_array().into_array(1); + let arr = array_length(&[list_array.clone()]) + .expect("failed to initialize function array_ndims"); let result = - as_uint8_array(&array).expect("failed to initialize function array_ndims"); + as_uint64_array(&arr).expect("failed to initialize function array_ndims"); - assert_eq!(result, &UInt8Array::from(vec![4])); + assert_eq!(result, &UInt64Array::from_value(4, 1)); // array_length([1, 2, 3, 4], 1) = 4 - let array = array_length(&[ - list_array, - ColumnarValue::Scalar(ScalarValue::UInt8(Some(1_u8))), - ]) - .expect("failed to initialize function array_ndims") - .into_array(1); + let array = array_length(&[list_array, Arc::new(Int64Array::from_value(1, 1))]) + .expect("failed to initialize function array_ndims"); let result = - as_uint8_array(&array).expect("failed to initialize function array_ndims"); + as_uint64_array(&array).expect("failed to initialize function array_ndims"); - assert_eq!(result, &UInt8Array::from(vec![4])); + assert_eq!(result, &UInt64Array::from_value(4, 1)); } #[test] fn test_nested_array_length() { - let list_array = return_nested_array(); + let list_array = return_nested_array().into_array(1); + + // array_length([[1, 2, 3, 4], [5, 6, 7, 8]]) = 2 + let arr = array_length(&[list_array.clone()]) + .expect("failed to initialize function array_length"); + let result = + as_uint64_array(&arr).expect("failed to initialize function array_length"); + + assert_eq!(result, &UInt64Array::from_value(2, 1)); // array_length([[1, 2, 3, 4], [5, 6, 7, 8]], 1) = 2 - let array = array_length(&[ - list_array.clone(), - ColumnarValue::Scalar(ScalarValue::UInt8(Some(1_u8))), - ]) - .expect("failed to initialize function array_length") - .into_array(1); + let arr = + array_length(&[list_array.clone(), Arc::new(Int64Array::from_value(1, 1))]) + .expect("failed to initialize function array_length"); let result = - as_uint8_array(&array).expect("failed to initialize function array_length"); + as_uint64_array(&arr).expect("failed to initialize function array_length"); - assert_eq!(result, &UInt8Array::from(vec![2])); + assert_eq!(result, &UInt64Array::from_value(2, 1)); // array_length([[1, 2, 3, 4], [5, 6, 7, 8]], 2) = 4 - let array = array_length(&[ - list_array.clone(), - ColumnarValue::Scalar(ScalarValue::UInt8(Some(2_u8))), - ]) - .expect("failed to initialize function array_length") - .into_array(1); + let arr = + array_length(&[list_array.clone(), Arc::new(Int64Array::from_value(2, 1))]) + .expect("failed to initialize function array_length"); let result = - as_uint8_array(&array).expect("failed to initialize function array_length"); + as_uint64_array(&arr).expect("failed to initialize function array_length"); - assert_eq!(result, &UInt8Array::from(vec![4])); + assert_eq!(result, &UInt64Array::from_value(4, 1)); // array_length([[1, 2, 3, 4], [5, 6, 7, 8]], 3) = NULL - let array = array_length(&[ - list_array, - ColumnarValue::Scalar(ScalarValue::UInt8(Some(3_u8))), - ]) - .expect("failed to initialize function array_length") - .into_array(1); + let arr = array_length(&[list_array, Arc::new(Int64Array::from_value(3, 1))]) + .expect("failed to initialize function array_length"); let result = - as_uint8_array(&array).expect("failed to initialize function array_length"); + as_uint64_array(&arr).expect("failed to initialize function array_length"); - assert_eq!(result, &UInt8Array::from(vec![None])); + assert_eq!(result, &UInt64Array::from(vec![None])); } #[test] @@ -1896,7 +1819,7 @@ mod tests { result .value(0) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap() .values() ); @@ -1917,7 +1840,7 @@ mod tests { result .value(0) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap() .values() ); @@ -1931,9 +1854,9 @@ mod tests { let array = array_ndims(&[list_array]) .expect("failed to initialize function array_ndims"); let result = - as_uint8_array(&array).expect("failed to initialize function array_ndims"); + as_uint64_array(&array).expect("failed to initialize function array_ndims"); - assert_eq!(result, &UInt8Array::from(vec![1])); + assert_eq!(result, &UInt64Array::from_value(1, 1)); } #[test] @@ -1944,9 +1867,9 @@ mod tests { let array = array_ndims(&[list_array]) .expect("failed to initialize function array_ndims"); let result = - as_uint8_array(&array).expect("failed to initialize function array_ndims"); + as_uint64_array(&array).expect("failed to initialize function array_ndims"); - assert_eq!(result, &UInt8Array::from(vec![2])); + assert_eq!(result, &UInt64Array::from_value(2, 1)); } #[test] diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index 215582a1a8be..6a81042b7cb4 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -418,7 +418,9 @@ pub fn create_physical_fun( Arc::new(|args| make_scalar_function(array_expressions::array_dims)(args)) } BuiltinScalarFunction::ArrayFill => Arc::new(array_expressions::array_fill), - BuiltinScalarFunction::ArrayLength => Arc::new(array_expressions::array_length), + BuiltinScalarFunction::ArrayLength => { + Arc::new(|args| make_scalar_function(array_expressions::array_length)(args)) + } BuiltinScalarFunction::ArrayNdims => { Arc::new(|args| make_scalar_function(array_expressions::array_ndims)(args)) } @@ -436,7 +438,9 @@ pub fn create_physical_fun( BuiltinScalarFunction::ArrayToString => Arc::new(|args| { make_scalar_function(array_expressions::array_to_string)(args) }), - BuiltinScalarFunction::Cardinality => Arc::new(array_expressions::cardinality), + BuiltinScalarFunction::Cardinality => { + Arc::new(|args| make_scalar_function(array_expressions::cardinality)(args)) + } BuiltinScalarFunction::MakeArray => Arc::new(array_expressions::make_array), BuiltinScalarFunction::TrimArray => { Arc::new(|args| make_scalar_function(array_expressions::trim_array)(args))