Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support FixedSizeList type coercion #8902

Merged
merged 24 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1483,7 +1483,9 @@ impl ScalarValue {
DataType::Interval(IntervalUnit::MonthDayNano) => {
build_array_primitive!(IntervalMonthDayNanoArray, IntervalMonthDayNano)
}
DataType::List(_) | DataType::LargeList(_) => build_list_array(scalars)?,
DataType::List(_)
| DataType::LargeList(_)
| DataType::FixedSizeList(_, _) => build_list_array(scalars)?,
DataType::Struct(fields) => {
// Initialize a Vector to store the ScalarValues for each column
let mut columns: Vec<Vec<ScalarValue>> =
Expand Down Expand Up @@ -1595,7 +1597,6 @@ impl ScalarValue {
| DataType::Time64(TimeUnit::Second)
| DataType::Time64(TimeUnit::Millisecond)
| DataType::Duration(_)
| DataType::FixedSizeList(_, _)
| DataType::Union(_, _)
| DataType::Map(_, _)
| DataType::RunEndEncoded(_, _) => {
Expand Down
53 changes: 35 additions & 18 deletions datafusion/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,9 +440,9 @@ pub fn arrays_into_list_array(
/// ```
pub fn base_type(data_type: &DataType) -> DataType {
match data_type {
DataType::List(field) | DataType::LargeList(field) => {
base_type(field.data_type())
}
DataType::List(field)
| DataType::LargeList(field)
| DataType::FixedSizeList(field, _) => base_type(field.data_type()),
_ => data_type.to_owned(),
}
}
Expand All @@ -464,31 +464,23 @@ pub fn coerced_type_with_base_type_only(
base_type: &DataType,
) -> DataType {
match data_type {
DataType::List(field) => {
let data_type = match field.data_type() {
DataType::List(_) => {
coerced_type_with_base_type_only(field.data_type(), base_type)
}
_ => base_type.to_owned(),
};
DataType::List(field) | DataType::FixedSizeList(field, _) => {
Weijun-H marked this conversation as resolved.
Show resolved Hide resolved
let field_type =
coerced_type_with_base_type_only(field.data_type(), base_type);

DataType::List(Arc::new(Field::new(
field.name(),
data_type,
field_type,
field.is_nullable(),
)))
}
DataType::LargeList(field) => {
let data_type = match field.data_type() {
DataType::LargeList(_) => {
coerced_type_with_base_type_only(field.data_type(), base_type)
}
_ => base_type.to_owned(),
};
let field_type =
coerced_type_with_base_type_only(field.data_type(), base_type);

DataType::LargeList(Arc::new(Field::new(
field.name(),
data_type,
field_type,
field.is_nullable(),
)))
}
Expand All @@ -497,6 +489,31 @@ pub fn coerced_type_with_base_type_only(
}
}

pub fn coerced_fixed_size_list_to_list(data_type: &DataType) -> DataType {
Weijun-H marked this conversation as resolved.
Show resolved Hide resolved
match data_type {
DataType::List(field) | DataType::FixedSizeList(field, _) => {
let field_type = coerced_fixed_size_list_to_list(field.data_type());

DataType::List(Arc::new(Field::new(
field.name(),
field_type,
field.is_nullable(),
)))
}
DataType::LargeList(field) => {
let field_type = coerced_fixed_size_list_to_list(field.data_type());

DataType::LargeList(Arc::new(Field::new(
field.name(),
field_type,
field.is_nullable(),
)))
}

_ => data_type.clone(),
}
}

/// Compute the number of dimensions in a list data type.
pub fn list_ndims(data_type: &DataType) -> u64 {
match data_type {
Expand Down
46 changes: 28 additions & 18 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,10 +599,11 @@ impl BuiltinScalarFunction {
}
BuiltinScalarFunction::ArrayDistinct => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArrayElement => match &input_expr_types[0] {
List(field) => Ok(field.data_type().clone()),
LargeList(field) => Ok(field.data_type().clone()),
List(field)
| LargeList(field)
| FixedSizeList(field, _) => Ok(field.data_type().clone()),
_ => plan_err!(
"The {self} function can only accept list or largelist as the first argument"
"The {self} function can only accept List, LargeList or FixedSizeList as the first argument"
),
},
BuiltinScalarFunction::ArrayLength => Ok(UInt64),
Expand Down Expand Up @@ -944,10 +945,9 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArraySort => {
Signature::variadic_any(self.volatility())
}
BuiltinScalarFunction::ArrayAppend => Signature {
type_signature: ArrayAndElement,
volatility: self.volatility(),
},
BuiltinScalarFunction::ArrayAppend => {
Signature::array_and_element(self.volatility())
}
BuiltinScalarFunction::MakeArray => {
// 0 or more arguments of arbitrary type
Signature::one_of(vec![VariadicEqual, Any(0)], self.volatility())
Expand All @@ -959,12 +959,17 @@ impl BuiltinScalarFunction {
}
BuiltinScalarFunction::ArrayDims => Signature::any(1, self.volatility()),
BuiltinScalarFunction::ArrayEmpty => Signature::any(1, self.volatility()),
BuiltinScalarFunction::ArrayElement => Signature::any(2, self.volatility()),
BuiltinScalarFunction::ArrayElement => {
Signature::array_and_index(self.volatility())
}
BuiltinScalarFunction::ArrayExcept => Signature::any(2, self.volatility()),
BuiltinScalarFunction::Flatten => Signature::any(1, self.volatility()),
BuiltinScalarFunction::ArrayHasAll
| BuiltinScalarFunction::ArrayHasAny
| BuiltinScalarFunction::ArrayHas => Signature::any(2, self.volatility()),
BuiltinScalarFunction::ArrayHasAll | BuiltinScalarFunction::ArrayHasAny => {
Signature::any(2, self.volatility())
}
BuiltinScalarFunction::ArrayHas => {
Signature::array_and_element(self.volatility())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add the example that works after this change.
I did not find one in array.slt

One of the example is array_has(list, null)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

array_has_any and array_has_any cannot use array_and_element, because their signature is array and array.

}
BuiltinScalarFunction::ArrayLength => {
Signature::variadic_any(self.volatility())
}
Expand All @@ -973,15 +978,20 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayPosition => {
Signature::variadic_any(self.volatility())
}
BuiltinScalarFunction::ArrayPositions => Signature::any(2, self.volatility()),
BuiltinScalarFunction::ArrayPrepend => Signature {
type_signature: ElementAndArray,
volatility: self.volatility(),
},
BuiltinScalarFunction::ArrayPositions => {
Signature::array_and_element(self.volatility())
}
BuiltinScalarFunction::ArrayPrepend => {
Signature::element_and_array(self.volatility())
}
BuiltinScalarFunction::ArrayRepeat => Signature::any(2, self.volatility()),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although the functions including array_repeat, array_union, array_intersect, and array_except are also two-argument, Signature::array_and_element could not handle NULL well for them. The following PR could continue to work on this issue.

BuiltinScalarFunction::ArrayRemove => Signature::any(2, self.volatility()),
BuiltinScalarFunction::ArrayRemove => {
Signature::array_and_element(self.volatility())
}
BuiltinScalarFunction::ArrayRemoveN => Signature::any(3, self.volatility()),
BuiltinScalarFunction::ArrayRemoveAll => Signature::any(2, self.volatility()),
BuiltinScalarFunction::ArrayRemoveAll => {
Signature::array_and_element(self.volatility())
}
BuiltinScalarFunction::ArrayReplace => Signature::any(3, self.volatility()),
BuiltinScalarFunction::ArrayReplaceN => Signature::any(4, self.volatility()),
BuiltinScalarFunction::ArrayReplaceAll => {
Expand Down
57 changes: 52 additions & 5 deletions datafusion/expr/src/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ pub enum TypeSignature {
/// Function `make_array` takes 0 or more arguments with arbitrary types, its `TypeSignature`
/// is `OneOf(vec![Any(0), VariadicAny])`.
OneOf(Vec<TypeSignature>),
/// Specifies Signatures for array functions
ArraySignature(ArrayFunctionSignature),
Comment on lines +119 to +120
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For future extension

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is a good change, though a change of the API

}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ArrayFunctionSignature {
/// Specialized Signature for ArrayAppend and similar functions
/// The first argument should be List/LargeList, and the second argument should be non-list or list.
/// The second argument's list dimension should be one dimension less than the first argument's list dimension.
Expand All @@ -126,6 +132,23 @@ pub enum TypeSignature {
/// The first argument should be non-list or list, and the second argument should be List/LargeList.
/// The first argument's list dimension should be one dimension less than the second argument's list dimension.
ElementAndArray,
ArrayAndIndex,
}

impl std::fmt::Display for ArrayFunctionSignature {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ArrayFunctionSignature::ArrayAndElement => {
write!(f, "array, element")
}
ArrayFunctionSignature::ElementAndArray => {
write!(f, "element, array")
}
ArrayFunctionSignature::ArrayAndIndex => {
write!(f, "array, index")
}
}
}
}

impl TypeSignature {
Expand Down Expand Up @@ -156,11 +179,8 @@ impl TypeSignature {
TypeSignature::OneOf(sigs) => {
sigs.iter().flat_map(|s| s.to_string_repr()).collect()
}
TypeSignature::ArrayAndElement => {
vec!["ArrayAndElement(List<T>, T)".to_string()]
}
TypeSignature::ElementAndArray => {
vec!["ElementAndArray(T, List<T>)".to_string()]
TypeSignature::ArraySignature(array_signature) => {
vec![array_signature.to_string()]
}
}
}
Expand Down Expand Up @@ -263,6 +283,33 @@ impl Signature {
volatility,
}
}
/// Specialized Signature for ArrayAppend and similar functions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice cleanup

pub fn array_and_element(volatility: Volatility) -> Self {
Signature {
type_signature: TypeSignature::ArraySignature(
ArrayFunctionSignature::ArrayAndElement,
),
volatility,
}
}
/// Specialized Signature for ArrayPrepend and similar functions
pub fn element_and_array(volatility: Volatility) -> Self {
Signature {
type_signature: TypeSignature::ArraySignature(
ArrayFunctionSignature::ElementAndArray,
),
volatility,
}
}
/// Specialized Signature for ArrayElement and similar functions
pub fn array_and_index(volatility: Volatility) -> Self {
Signature {
type_signature: TypeSignature::ArraySignature(
ArrayFunctionSignature::ArrayAndIndex,
),
volatility,
}
}
}

/// Monotonicity of the `ScalarFunctionExpr` with respect to its arguments.
Expand Down
62 changes: 45 additions & 17 deletions datafusion/expr/src/type_coercion/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
// specific language governing permissions and limitations
// under the License.

use crate::signature::TIMEZONE_WILDCARD;
use crate::signature::{ArrayFunctionSignature, TIMEZONE_WILDCARD};
use crate::{Signature, TypeSignature};
use arrow::{
compute::can_cast_types,
datatypes::{DataType, TimeUnit},
};
use datafusion_common::utils::list_ndims;
use datafusion_common::{internal_err, plan_err, DataFusionError, Result};
use datafusion_common::utils::{coerced_fixed_size_list_to_list, list_ndims};
use datafusion_common::{
internal_datafusion_err, internal_err, plan_err, DataFusionError, Result,
};

use super::binary::comparison_coercion;

Expand All @@ -48,7 +50,6 @@ pub fn data_types(
);
}
}

let valid_types = get_valid_types(&signature.type_signature, current_types)?;

if valid_types
Expand Down Expand Up @@ -104,30 +105,48 @@ fn get_valid_types(
let elem_base_type = datafusion_common::utils::base_type(elem_type);
let new_base_type = comparison_coercion(&array_base_type, &elem_base_type);

if new_base_type.is_none() {
return internal_err!(
let new_base_type = new_base_type.ok_or_else(|| {
internal_datafusion_err!(
"Coercion from {array_base_type:?} to {elem_base_type:?} not supported."
);
}
let new_base_type = new_base_type.unwrap();
)
})?;

let array_type = datafusion_common::utils::coerced_type_with_base_type_only(
array_type,
&new_base_type,
);

match array_type {
DataType::List(ref field) | DataType::LargeList(ref field) => {
DataType::List(ref field)
| DataType::LargeList(ref field)
| DataType::FixedSizeList(ref field, _) => {
let elem_type = field.data_type();
if is_append {
Ok(vec![vec![array_type.clone(), elem_type.to_owned()]])
Ok(vec![vec![array_type.clone(), elem_type.clone()]])
} else {
Ok(vec![vec![elem_type.to_owned(), array_type.clone()]])
}
}
_ => Ok(vec![vec![]]),
}
}
fn array_and_index(current_types: &[DataType]) -> Result<Vec<Vec<DataType>>> {
if current_types.len() != 2 {
return Ok(vec![vec![]]);
}

let array_type = &current_types[0];

match array_type {
DataType::List(_)
| DataType::LargeList(_)
| DataType::FixedSizeList(_, _) => {
let array_type = coerced_fixed_size_list_to_list(array_type);
Ok(vec![vec![array_type, DataType::Int64]])
}
_ => Ok(vec![vec![]]),
}
}
let valid_types = match signature {
TypeSignature::Variadic(valid_types) => valid_types
.iter()
Expand Down Expand Up @@ -160,12 +179,19 @@ fn get_valid_types(
}

TypeSignature::Exact(valid_types) => vec![valid_types.clone()],
TypeSignature::ArrayAndElement => {
return array_append_or_prepend_valid_types(current_types, true)
}
TypeSignature::ElementAndArray => {
return array_append_or_prepend_valid_types(current_types, false)
}
TypeSignature::ArraySignature(ref function_signature) => match function_signature
{
ArrayFunctionSignature::ArrayAndElement => {
return array_append_or_prepend_valid_types(current_types, true)
}
ArrayFunctionSignature::ArrayAndIndex => {
return array_and_index(current_types)
}
ArrayFunctionSignature::ElementAndArray => {
return array_append_or_prepend_valid_types(current_types, false)
}
},

TypeSignature::Any(number) => {
if current_types.len() != *number {
return plan_err!(
Expand Down Expand Up @@ -311,6 +337,8 @@ fn coerced_from<'a>(
Utf8 | LargeUtf8 => Some(type_into.clone()),
Null if can_cast_types(type_from, type_into) => Some(type_into.clone()),

List(_) if matches!(type_from, FixedSizeList(_, _)) => Some(type_into.clone()),
Copy link
Contributor

@jayzhan211 jayzhan211 Jan 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be easier if we run Fixedsizelist to List like coerce_arguments_for_fun before this signature type coercion?

Copy link
Member Author

@Weijun-H Weijun-H Jan 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unable to decide between coerce_arguments_for_fun and coerce_arguments_for_signature. Are there specific benefits for each?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One is coerced based on function, another is based on signature. If we target to convert from fixedsizelist to list, it seems coerce_arguments_for_fun is more suitable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think we can have fixed size list to list conversion in one place.
Currently, it converts both in here and coerce_arguments_for_fun, right? Is it possible to have it once?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move coerce_arguments_for_fun before signature? Convert FixedSizeList to List before signature coercion. Then we don't need to deal with FixedSizeList

Copy link
Member Author

@Weijun-H Weijun-H Jan 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can.

In ExprSchemale(https://github.com/apache/arrow-datafusion/blob/a7cdf605a1e23608e51889988c239613c80fb671/datafusion/expr/src/expr_schema.rs#L91-L105)
It forces the function return type consistent, but I am not sure of its purpose. Maybe @2010YOUY01 could explain it more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we dont need this anymore, input types should be handled later on via coerce_arguments_for_signature. They are doing the same things

Copy link
Contributor

@jayzhan211 jayzhan211 Jan 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// Only accept list and largelist with the same number of dimensions unless the type is Null.
// List or LargeList with different dimensions should be handled in TypeSignature or other places before this.
List(_) | LargeList(_)
Expand Down
1 change: 0 additions & 1 deletion datafusion/optimizer/src/analyzer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,6 @@ fn coerce_arguments_for_fun(
if expressions.is_empty() {
return Ok(vec![]);
}

let mut expressions: Vec<Expr> = expressions.to_vec();

// Cast Fixedsizelist to List for array functions
Expand Down
Loading
Loading