From 77f43c55f1d8175efa09821b44d32e823e433b43 Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Mon, 22 Apr 2024 06:44:49 -0400 Subject: [PATCH] Move coalesce to datafusion-functions and remove BuiltInScalarFunction (#10098) --- .../core/src/datasource/listing/helpers.rs | 10 - datafusion/expr/src/built_in_function.rs | 207 ------------------ datafusion/expr/src/expr.rs | 22 +- datafusion/expr/src/expr_fn.rs | 25 +-- datafusion/expr/src/expr_schema.rs | 17 -- datafusion/expr/src/lib.rs | 2 - datafusion/expr/src/tree_node.rs | 3 - datafusion/functions/src/math/coalesce.rs | 141 ++++++++++++ datafusion/functions/src/math/mod.rs | 8 + .../optimizer/src/analyzer/type_coercion.rs | 10 - .../simplify_expressions/expr_simplifier.rs | 3 - .../src/conditional_expressions.rs | 79 ------- datafusion/physical-expr/src/functions.rs | 55 +---- datafusion/physical-expr/src/lib.rs | 1 - datafusion/physical-expr/src/planner.rs | 25 +-- .../physical-expr/src/scalar_function.rs | 39 +--- datafusion/proto/proto/datafusion.proto | 2 +- datafusion/proto/src/generated/pbjson.rs | 3 - datafusion/proto/src/generated/prost.rs | 8 +- .../proto/src/logical_plan/from_proto.rs | 19 +- datafusion/proto/src/logical_plan/to_proto.rs | 61 ++---- .../proto/src/physical_plan/from_proto.rs | 25 +-- .../proto/src/physical_plan/to_proto.rs | 60 ++--- datafusion/proto/tests/cases/serialize.rs | 7 +- datafusion/sql/src/expr/function.rs | 11 +- .../sqllogictest/test_files/timestamps.slt | 6 +- .../substrait/src/logical_plan/consumer.rs | 23 +- 27 files changed, 241 insertions(+), 631 deletions(-) delete mode 100644 datafusion/expr/src/built_in_function.rs create mode 100644 datafusion/functions/src/math/coalesce.rs delete mode 100644 datafusion/physical-expr/src/conditional_expressions.rs diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index f97d465c442b..b415ce9d913e 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -90,16 +90,6 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool { Expr::ScalarFunction(scalar_function) => { match &scalar_function.func_def { - ScalarFunctionDefinition::BuiltIn(fun) => { - match fun.volatility() { - Volatility::Immutable => Ok(TreeNodeRecursion::Continue), - // TODO: Stable functions could be `applicable`, but that would require access to the context - Volatility::Stable | Volatility::Volatile => { - is_applicable = false; - Ok(TreeNodeRecursion::Stop) - } - } - } ScalarFunctionDefinition::UDF(fun) => { match fun.signature().volatility { Volatility::Immutable => Ok(TreeNodeRecursion::Continue), diff --git a/datafusion/expr/src/built_in_function.rs b/datafusion/expr/src/built_in_function.rs deleted file mode 100644 index 83eb2f722b08..000000000000 --- a/datafusion/expr/src/built_in_function.rs +++ /dev/null @@ -1,207 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Built-in functions module contains all the built-in functions definitions. - -use std::collections::HashMap; -use std::fmt; -use std::str::FromStr; -use std::sync::OnceLock; - -use crate::type_coercion::functions::data_types; -use crate::{FuncMonotonicity, Signature, Volatility}; - -use arrow::datatypes::DataType; -use datafusion_common::{plan_err, DataFusionError, Result}; - -use strum::IntoEnumIterator; -use strum_macros::EnumIter; - -/// Enum of all built-in scalar functions -// Contributor's guide for adding new scalar functions -// https://arrow.apache.org/datafusion/contributor-guide/index.html#how-to-add-a-new-scalar-function -#[derive(Debug, Clone, PartialEq, Eq, Hash, EnumIter, Copy)] -pub enum BuiltinScalarFunction { - // math functions - /// coalesce - Coalesce, -} - -/// Maps the sql function name to `BuiltinScalarFunction` -fn name_to_function() -> &'static HashMap<&'static str, BuiltinScalarFunction> { - static NAME_TO_FUNCTION_LOCK: OnceLock> = - OnceLock::new(); - NAME_TO_FUNCTION_LOCK.get_or_init(|| { - let mut map = HashMap::new(); - BuiltinScalarFunction::iter().for_each(|func| { - func.aliases().iter().for_each(|&a| { - map.insert(a, func); - }); - }); - map - }) -} - -/// Maps `BuiltinScalarFunction` --> canonical sql function -/// First alias in the array is used to display function names -fn function_to_name() -> &'static HashMap { - static FUNCTION_TO_NAME_LOCK: OnceLock> = - OnceLock::new(); - FUNCTION_TO_NAME_LOCK.get_or_init(|| { - let mut map = HashMap::new(); - BuiltinScalarFunction::iter().for_each(|func| { - map.insert(func, *func.aliases().first().unwrap_or(&"NO_ALIAS")); - }); - map - }) -} - -impl BuiltinScalarFunction { - /// an allowlist of functions to take zero arguments, so that they will get special treatment - /// while executing. - #[deprecated( - since = "32.0.0", - note = "please use TypeSignature::supports_zero_argument instead" - )] - pub fn supports_zero_argument(&self) -> bool { - self.signature().type_signature.supports_zero_argument() - } - - /// Returns the name of this function - pub fn name(&self) -> &str { - // .unwrap is safe here because compiler makes sure the map will have matches for each BuiltinScalarFunction - function_to_name().get(self).unwrap() - } - - /// Returns the [Volatility] of the builtin function. - pub fn volatility(&self) -> Volatility { - match self { - // Immutable scalar builtins - BuiltinScalarFunction::Coalesce => Volatility::Immutable, - } - } - - /// Returns the output [`DataType`] of this function - /// - /// This method should be invoked only after `input_expr_types` have been validated - /// against the function's `TypeSignature` using `type_coercion::functions::data_types()`. - /// - /// This method will: - /// 1. Perform additional checks on `input_expr_types` that are beyond the scope of `TypeSignature` validation. - /// 2. Deduce the output `DataType` based on the provided `input_expr_types`. - pub fn return_type(self, input_expr_types: &[DataType]) -> Result { - // Note that this function *must* return the same type that the respective physical expression returns - // or the execution panics. - - // the return type of the built in function. - // Some built-in functions' return type depends on the incoming type. - match self { - BuiltinScalarFunction::Coalesce => { - // COALESCE has multiple args and they might get coerced, get a preview of this - let coerced_types = data_types(input_expr_types, &self.signature()); - coerced_types.map(|types| types[0].clone()) - } - } - } - - /// Return the argument [`Signature`] supported by this function - pub fn signature(&self) -> Signature { - // note: the physical expression must accept the type returned by this function or the execution panics. - - // for now, the list is small, as we do not have many built-in functions. - match self { - BuiltinScalarFunction::Coalesce => { - Signature::variadic_equal(self.volatility()) - } - } - } - - /// This function specifies monotonicity behaviors for built-in scalar functions. - /// The list can be extended, only mathematical and datetime functions are - /// considered for the initial implementation of this feature. - pub fn monotonicity(&self) -> Option { - None - } - - /// Returns all names that can be used to call this function - pub fn aliases(&self) -> &'static [&'static str] { - match self { - // conditional functions - BuiltinScalarFunction::Coalesce => &["coalesce"], - } - } -} - -impl fmt::Display for BuiltinScalarFunction { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.name()) - } -} - -impl FromStr for BuiltinScalarFunction { - type Err = DataFusionError; - fn from_str(name: &str) -> Result { - if let Some(func) = name_to_function().get(name) { - Ok(*func) - } else { - plan_err!("There is no built-in function named {name}") - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - // Test for BuiltinScalarFunction's Display and from_str() implementations. - // For each variant in BuiltinScalarFunction, it converts the variant to a string - // and then back to a variant. The test asserts that the original variant and - // the reconstructed variant are the same. This assertion is also necessary for - // function suggestion. See https://github.com/apache/arrow-datafusion/issues/8082 - fn test_display_and_from_str() { - for (_, func_original) in name_to_function().iter() { - let func_name = func_original.to_string(); - let func_from_str = BuiltinScalarFunction::from_str(&func_name).unwrap(); - assert_eq!(func_from_str, *func_original); - } - } - - #[test] - fn test_coalesce_return_types() { - let coalesce = BuiltinScalarFunction::Coalesce; - let return_type = coalesce - .return_type(&[DataType::Date32, DataType::Date32]) - .unwrap(); - assert_eq!(return_type, DataType::Date32); - } - - #[test] - fn test_coalesce_return_types_dictionary() { - let coalesce = BuiltinScalarFunction::Coalesce; - let return_type = coalesce - .return_type(&[ - DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), - DataType::Utf8, - ]) - .unwrap(); - assert_eq!( - return_type, - DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)) - ); - } -} diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 2a8fe4ca90a5..08d495c3be35 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -28,8 +28,8 @@ use crate::logical_plan::Subquery; use crate::utils::expr_to_columns; use crate::window_frame; use crate::{ - aggregate_function, built_in_function, built_in_window_function, udaf, - BuiltinScalarFunction, ExprSchemable, Operator, Signature, + aggregate_function, built_in_window_function, udaf, ExprSchemable, Operator, + Signature, }; use arrow::datatypes::DataType; @@ -362,10 +362,6 @@ impl Between { #[derive(Debug, Clone, PartialEq, Eq, Hash)] /// Defines which implementation of a function for DataFusion to call. pub enum ScalarFunctionDefinition { - /// Resolved to a `BuiltinScalarFunction` - /// There is plan to migrate `BuiltinScalarFunction` to UDF-based implementation (issue#8045) - /// This variant is planned to be removed in long term - BuiltIn(BuiltinScalarFunction), /// Resolved to a user defined function UDF(Arc), /// A scalar function constructed with name. This variant can not be executed directly @@ -393,7 +389,6 @@ impl ScalarFunctionDefinition { /// Function's name for display pub fn name(&self) -> &str { match self { - ScalarFunctionDefinition::BuiltIn(fun) => fun.name(), ScalarFunctionDefinition::UDF(udf) => udf.name(), ScalarFunctionDefinition::Name(func_name) => func_name.as_ref(), } @@ -403,9 +398,6 @@ impl ScalarFunctionDefinition { /// when evaluated multiple times with the same input. pub fn is_volatile(&self) -> Result { match self { - ScalarFunctionDefinition::BuiltIn(fun) => { - Ok(fun.volatility() == crate::Volatility::Volatile) - } ScalarFunctionDefinition::UDF(udf) => { Ok(udf.signature().volatility == crate::Volatility::Volatile) } @@ -419,14 +411,6 @@ impl ScalarFunctionDefinition { } impl ScalarFunction { - /// Create a new ScalarFunction expression - pub fn new(fun: built_in_function::BuiltinScalarFunction, args: Vec) -> Self { - Self { - func_def: ScalarFunctionDefinition::BuiltIn(fun), - args, - } - } - /// Create a new ScalarFunction expression with a user-defined function (UDF) pub fn new_udf(udf: Arc, args: Vec) -> Self { Self { @@ -1282,7 +1266,7 @@ impl Expr { pub fn short_circuits(&self) -> bool { match self { Expr::ScalarFunction(ScalarFunction { func_def, .. }) => { - matches!(func_def, ScalarFunctionDefinition::BuiltIn(fun) if *fun == BuiltinScalarFunction::Coalesce) + matches!(func_def, ScalarFunctionDefinition::UDF(fun) if fun.name().eq("coalesce")) } Expr::BinaryExpr(BinaryExpr { op, .. }) => { matches!(op, Operator::And | Operator::Or) diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index 567f260daaf9..1d976a12cc4f 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -19,15 +19,15 @@ use crate::expr::{ AggregateFunction, BinaryExpr, Cast, Exists, GroupingSet, InList, InSubquery, - Placeholder, ScalarFunction, TryCast, + Placeholder, TryCast, }; use crate::function::{ AccumulatorArgs, AccumulatorFactoryFunction, PartitionEvaluatorFactory, }; use crate::{ - aggregate_function, built_in_function, conditional_expressions::CaseBuilder, - logical_plan::Subquery, AggregateUDF, Expr, LogicalPlan, Operator, - ScalarFunctionImplementation, ScalarUDF, Signature, Volatility, + aggregate_function, conditional_expressions::CaseBuilder, logical_plan::Subquery, + AggregateUDF, Expr, LogicalPlan, Operator, ScalarFunctionImplementation, ScalarUDF, + Signature, Volatility, }; use crate::{AggregateUDFImpl, ColumnarValue, ScalarUDFImpl, WindowUDF, WindowUDFImpl}; use arrow::datatypes::{DataType, Field}; @@ -478,23 +478,6 @@ pub fn is_not_unknown(expr: Expr) -> Expr { Expr::IsNotUnknown(Box::new(expr)) } -macro_rules! nary_scalar_expr { - ($ENUM:ident, $FUNC:ident, $DOC:expr) => { - #[doc = $DOC ] - pub fn $FUNC(args: Vec) -> Expr { - Expr::ScalarFunction(ScalarFunction::new( - built_in_function::BuiltinScalarFunction::$ENUM, - args, - )) - } - }; -} - -// generate methods for creating the supported unary/binary expressions - -// math functions -nary_scalar_expr!(Coalesce, coalesce, "returns `coalesce(args...)`, which evaluates to the value of the first [Expr] which is not NULL"); - /// Create a CASE WHEN statement with literal WHEN expressions for comparison to the base expression. pub fn case(expr: Expr) -> CaseBuilder { CaseBuilder::new(Some(Box::new(expr)), vec![], vec![], None) diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index 466fd13ce207..e01ec2296a32 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -139,23 +139,6 @@ impl ExprSchemable for Expr { .map(|e| e.get_type(schema)) .collect::>>()?; match func_def { - ScalarFunctionDefinition::BuiltIn(fun) => { - // verify that function is invoked with correct number and type of arguments as defined in `TypeSignature` - data_types(&arg_data_types, &fun.signature()).map_err(|_| { - plan_datafusion_err!( - "{}", - utils::generate_signature_error_msg( - &format!("{fun}"), - fun.signature(), - &arg_data_types, - ) - ) - })?; - - // perform additional function arguments validation (due to limited - // expressiveness of `TypeSignature`), then infer return type - fun.return_type(&arg_data_types) - } ScalarFunctionDefinition::UDF(fun) => { // verify that function is invoked with correct number and type of arguments as defined in `TypeSignature` data_types(&arg_data_types, fun.signature()).map_err(|_| { diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index 36732324eff6..7325b9e64f10 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -26,7 +26,6 @@ //! The [expr_fn] module contains functions for creating expressions. mod accumulator; -mod built_in_function; mod built_in_window_function; mod columnar_value; mod literal; @@ -60,7 +59,6 @@ pub mod window_state; pub use accumulator::Accumulator; pub use aggregate_function::AggregateFunction; -pub use built_in_function::BuiltinScalarFunction; pub use built_in_window_function::BuiltInWindowFunction; pub use columnar_value::ColumnarValue; pub use expr::{ diff --git a/datafusion/expr/src/tree_node.rs b/datafusion/expr/src/tree_node.rs index 35fec509c95a..471ed0b975b0 100644 --- a/datafusion/expr/src/tree_node.rs +++ b/datafusion/expr/src/tree_node.rs @@ -283,9 +283,6 @@ impl TreeNode for Expr { .update_data(|be| Expr::Sort(Sort::new(be, asc, nulls_first))), Expr::ScalarFunction(ScalarFunction { func_def, args }) => { transform_vec(args, &mut f)?.map_data(|new_args| match func_def { - ScalarFunctionDefinition::BuiltIn(fun) => { - Ok(Expr::ScalarFunction(ScalarFunction::new(fun, new_args))) - } ScalarFunctionDefinition::UDF(fun) => { Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fun, new_args))) } diff --git a/datafusion/functions/src/math/coalesce.rs b/datafusion/functions/src/math/coalesce.rs new file mode 100644 index 000000000000..3e16113bbd05 --- /dev/null +++ b/datafusion/functions/src/math/coalesce.rs @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; + +use arrow::array::{new_null_array, BooleanArray}; +use arrow::compute::kernels::zip::zip; +use arrow::compute::{and, is_not_null, is_null}; +use arrow::datatypes::DataType; + +use datafusion_common::{exec_err, Result}; +use datafusion_expr::type_coercion::functions::data_types; +use datafusion_expr::ColumnarValue; +use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; + +#[derive(Debug)] +pub struct CoalesceFunc { + signature: Signature, +} + +impl Default for CoalesceFunc { + fn default() -> Self { + CoalesceFunc::new() + } +} + +impl CoalesceFunc { + pub fn new() -> Self { + Self { + signature: Signature::variadic_equal(Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for CoalesceFunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "coalesce" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + // COALESCE has multiple args and they might get coerced, get a preview of this + let coerced_types = data_types(arg_types, self.signature()); + coerced_types.map(|types| types[0].clone()) + } + + /// coalesce evaluates to the first value which is not NULL + fn invoke(&self, args: &[ColumnarValue]) -> Result { + // do not accept 0 arguments. + if args.is_empty() { + return exec_err!( + "coalesce was called with {} arguments. It requires at least 1.", + args.len() + ); + } + + let return_type = args[0].data_type(); + let mut return_array = args.iter().filter_map(|x| match x { + ColumnarValue::Array(array) => Some(array.len()), + _ => None, + }); + + if let Some(size) = return_array.next() { + // start with nulls as default output + let mut current_value = new_null_array(&return_type, size); + let mut remainder = BooleanArray::from(vec![true; size]); + + for arg in args { + match arg { + ColumnarValue::Array(ref array) => { + let to_apply = and(&remainder, &is_not_null(array.as_ref())?)?; + current_value = zip(&to_apply, array, ¤t_value)?; + remainder = and(&remainder, &is_null(array)?)?; + } + ColumnarValue::Scalar(value) => { + if value.is_null() { + continue; + } else { + let last_value = value.to_scalar()?; + current_value = zip(&remainder, &last_value, ¤t_value)?; + break; + } + } + } + if remainder.iter().all(|x| x == Some(false)) { + break; + } + } + Ok(ColumnarValue::Array(current_value)) + } else { + let result = args + .iter() + .filter_map(|x| match x { + ColumnarValue::Scalar(s) if !s.is_null() => Some(x.clone()), + _ => None, + }) + .next() + .unwrap_or_else(|| args[0].clone()); + Ok(result) + } + } +} + +#[cfg(test)] +mod test { + use arrow::datatypes::DataType; + + use datafusion_expr::ScalarUDFImpl; + + use crate::math; + + #[test] + fn test_coalesce_return_types() { + let coalesce = math::coalesce::CoalesceFunc::new(); + let return_type = coalesce + .return_type(&[DataType::Date32, DataType::Date32]) + .unwrap(); + assert_eq!(return_type, DataType::Date32); + } +} diff --git a/datafusion/functions/src/math/mod.rs b/datafusion/functions/src/math/mod.rs index b6e8d26b6460..1d9e5d94a90d 100644 --- a/datafusion/functions/src/math/mod.rs +++ b/datafusion/functions/src/math/mod.rs @@ -21,6 +21,7 @@ use datafusion_expr::ScalarUDF; use std::sync::Arc; pub mod abs; +pub mod coalesce; pub mod cot; pub mod factorial; pub mod gcd; @@ -46,6 +47,7 @@ make_math_unary_udf!(AtanhFunc, ATANH, atanh, atanh, Some(vec![Some(true)])); make_math_binary_udf!(Atan2, ATAN2, atan2, atan2, Some(vec![Some(true)])); make_math_unary_udf!(CbrtFunc, CBRT, cbrt, cbrt, None); make_math_unary_udf!(CeilFunc, CEIL, ceil, ceil, Some(vec![Some(true)])); +make_udf_function!(coalesce::CoalesceFunc, COALESCE, coalesce); make_math_unary_udf!(CosFunc, COS, cos, cos, None); make_math_unary_udf!(CoshFunc, COSH, cosh, cosh, None); make_udf_function!(cot::CotFunc, COT, cot); @@ -128,6 +130,11 @@ pub mod expr_fn { super::ceil().call(vec![num]) } + #[doc = "returns `coalesce(args...)`, which evaluates to the value of the first [Expr] which is not NULL"] + pub fn coalesce(args: Vec) -> Expr { + super::coalesce().call(args) + } + #[doc = "cosine"] pub fn cos(num: Expr) -> Expr { super::cos().call(vec![num]) @@ -282,6 +289,7 @@ pub fn functions() -> Vec> { atanh(), cbrt(), ceil(), + coalesce(), cos(), cosh(), cot(), diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index ac96decbdd80..ba06cb12b11e 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -306,16 +306,6 @@ impl TreeNodeRewriter for TypeCoercionRewriter { Ok(Transformed::yes(Expr::Case(case))) } Expr::ScalarFunction(ScalarFunction { func_def, args }) => match func_def { - ScalarFunctionDefinition::BuiltIn(fun) => { - let new_args = coerce_arguments_for_signature( - args.as_slice(), - &self.schema, - &fun.signature(), - )?; - Ok(Transformed::yes(Expr::ScalarFunction(ScalarFunction::new( - fun, new_args, - )))) - } ScalarFunctionDefinition::UDF(fun) => { let new_expr = coerce_arguments_for_signature( args.as_slice(), diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index bb14f75446df..8502ac36ad09 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -525,9 +525,6 @@ impl<'a> ConstEvaluator<'a> { | Expr::Wildcard { .. } | Expr::Placeholder(_) => false, Expr::ScalarFunction(ScalarFunction { func_def, .. }) => match func_def { - ScalarFunctionDefinition::BuiltIn(fun) => { - Self::volatility_ok(fun.volatility()) - } ScalarFunctionDefinition::UDF(fun) => { Self::volatility_ok(fun.signature().volatility) } diff --git a/datafusion/physical-expr/src/conditional_expressions.rs b/datafusion/physical-expr/src/conditional_expressions.rs deleted file mode 100644 index 87d63bfd32e2..000000000000 --- a/datafusion/physical-expr/src/conditional_expressions.rs +++ /dev/null @@ -1,79 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow::array::{new_null_array, Array, BooleanArray}; -use arrow::compute::kernels::zip::zip; -use arrow::compute::{and, is_not_null, is_null}; - -use datafusion_common::{exec_err, Result}; -use datafusion_expr::ColumnarValue; - -/// coalesce evaluates to the first value which is not NULL -pub fn coalesce(args: &[ColumnarValue]) -> Result { - // do not accept 0 arguments. - if args.is_empty() { - return exec_err!( - "coalesce was called with {} arguments. It requires at least 1.", - args.len() - ); - } - - let return_type = args[0].data_type(); - let mut return_array = args.iter().filter_map(|x| match x { - ColumnarValue::Array(array) => Some(array.len()), - _ => None, - }); - - if let Some(size) = return_array.next() { - // start with nulls as default output - let mut current_value = new_null_array(&return_type, size); - let mut remainder = BooleanArray::from(vec![true; size]); - - for arg in args { - match arg { - ColumnarValue::Array(ref array) => { - let to_apply = and(&remainder, &is_not_null(array.as_ref())?)?; - current_value = zip(&to_apply, array, ¤t_value)?; - remainder = and(&remainder, &is_null(array)?)?; - } - ColumnarValue::Scalar(value) => { - if value.is_null() { - continue; - } else { - let last_value = value.to_scalar()?; - current_value = zip(&remainder, &last_value, ¤t_value)?; - break; - } - } - } - if remainder.iter().all(|x| x == Some(false)) { - break; - } - } - Ok(ColumnarValue::Array(current_value)) - } else { - let result = args - .iter() - .filter_map(|x| match x { - ColumnarValue::Scalar(s) if !s.is_null() => Some(x.clone()), - _ => None, - }) - .next() - .unwrap_or_else(|| args[0].clone()); - Ok(result) - } -} diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index 656ce711a0b0..875fe7ac3be1 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -37,47 +37,14 @@ use arrow::{array::ArrayRef, datatypes::Schema}; use arrow_array::Array; use datafusion_common::{DFSchema, Result, ScalarValue}; -use datafusion_expr::execution_props::ExecutionProps; pub use datafusion_expr::FuncMonotonicity; use datafusion_expr::{ - type_coercion::functions::data_types, BuiltinScalarFunction, ColumnarValue, - ScalarFunctionImplementation, + type_coercion::functions::data_types, ColumnarValue, ScalarFunctionImplementation, }; use datafusion_expr::{Expr, ScalarFunctionDefinition, ScalarUDF}; use crate::sort_properties::SortProperties; -use crate::{conditional_expressions, PhysicalExpr, ScalarFunctionExpr}; - -/// Create a physical (function) expression. -/// This function errors when `args`' can't be coerced to a valid argument type of the function. -pub fn create_builtin_physical_expr( - fun: &BuiltinScalarFunction, - input_phy_exprs: &[Arc], - input_schema: &Schema, - _execution_props: &ExecutionProps, -) -> Result> { - let input_expr_types = input_phy_exprs - .iter() - .map(|e| e.data_type(input_schema)) - .collect::>>()?; - - // verify that input data types is consistent with function's `TypeSignature` - data_types(&input_expr_types, &fun.signature())?; - - let data_type = fun.return_type(&input_expr_types)?; - - let monotonicity = fun.monotonicity(); - - let fun_def = ScalarFunctionDefinition::BuiltIn(*fun); - Ok(Arc::new(ScalarFunctionExpr::new( - &format!("{fun}"), - fun_def, - input_phy_exprs.to_vec(), - data_type, - monotonicity, - fun.signature().type_signature.supports_zero_argument(), - ))) -} +use crate::{PhysicalExpr, ScalarFunctionExpr}; /// Create a physical (function) expression. /// This function errors when `args`' can't be coerced to a valid argument type of the function. @@ -199,24 +166,6 @@ where }) } -/// Create a physical scalar function. -pub fn create_physical_fun( - fun: &BuiltinScalarFunction, -) -> Result { - Ok(match fun { - // string functions - BuiltinScalarFunction::Coalesce => Arc::new(conditional_expressions::coalesce), - }) -} - -#[deprecated( - since = "32.0.0", - note = "Moved to `expr` crate. Please use `BuiltinScalarFunction::monotonicity()` instead" -)] -pub fn get_func_monotonicity(fun: &BuiltinScalarFunction) -> Option { - fun.monotonicity() -} - /// Determines a [`ScalarFunctionExpr`]'s monotonicity for the given arguments /// and the function's behavior depending on its arguments. pub fn out_ordering( diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index aabcf42fe7c4..e0f19ad133e5 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -18,7 +18,6 @@ pub mod aggregate; pub mod analysis; pub mod binary_map; -pub mod conditional_expressions; pub mod equivalence; pub mod expressions; pub mod functions; diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index 20626818c83b..bf7b52f1c147 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -15,11 +15,10 @@ // specific language governing permissions and limitations // under the License. -use crate::{ - expressions::{self, binary, like, Column, Literal}, - functions, udf, PhysicalExpr, -}; +use std::sync::Arc; + use arrow::datatypes::Schema; + use datafusion_common::{ exec_err, internal_err, not_impl_err, plan_err, DFSchema, Result, ScalarValue, }; @@ -31,7 +30,11 @@ use datafusion_expr::{ binary_expr, Between, BinaryExpr, Expr, GetFieldAccess, GetIndexedField, Like, Operator, ScalarFunctionDefinition, TryCast, }; -use std::sync::Arc; + +use crate::{ + expressions::{self, binary, like, Column, Literal}, + udf, PhysicalExpr, +}; /// [PhysicalExpr] evaluate DataFusion expressions such as `A + 1`, or `CAST(c1 /// AS int)`. @@ -306,14 +309,6 @@ pub fn create_physical_expr( create_physical_exprs(args, input_dfschema, execution_props)?; match func_def { - ScalarFunctionDefinition::BuiltIn(fun) => { - functions::create_builtin_physical_expr( - fun, - &physical_args, - input_schema, - execution_props, - ) - } ScalarFunctionDefinition::UDF(fun) => udf::create_physical_expr( fun.clone().as_ref(), &physical_args, @@ -390,12 +385,14 @@ where #[cfg(test)] mod tests { - use super::*; use arrow_array::{ArrayRef, BooleanArray, RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema}; + use datafusion_common::{DFSchema, Result}; use datafusion_expr::{col, lit}; + use super::*; + #[test] fn test_create_physical_expr_scalar_input_output() -> Result<()> { let expr = col("letter").eq(lit("A")); diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index d34084236690..9ae9f3dee3e7 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -34,19 +34,19 @@ use std::fmt::{self, Debug, Formatter}; use std::hash::{Hash, Hasher}; use std::sync::Arc; -use crate::functions::{create_physical_fun, out_ordering}; -use crate::physical_expr::{down_cast_any_ref, physical_exprs_equal}; -use crate::sort_properties::SortProperties; -use crate::PhysicalExpr; - use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; + use datafusion_common::{internal_err, Result}; use datafusion_expr::{ - expr_vec_fmt, BuiltinScalarFunction, ColumnarValue, FuncMonotonicity, - ScalarFunctionDefinition, + expr_vec_fmt, ColumnarValue, FuncMonotonicity, ScalarFunctionDefinition, }; +use crate::functions::out_ordering; +use crate::physical_expr::{down_cast_any_ref, physical_exprs_equal}; +use crate::sort_properties::SortProperties; +use crate::PhysicalExpr; + /// Physical expression of a scalar function pub struct ScalarFunctionExpr { fun: ScalarFunctionDefinition, @@ -122,7 +122,7 @@ impl ScalarFunctionExpr { } impl fmt::Display for ScalarFunctionExpr { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { write!(f, "{}({})", self.name, expr_vec_fmt!(self.args)) } } @@ -144,24 +144,11 @@ impl PhysicalExpr for ScalarFunctionExpr { fn evaluate(&self, batch: &RecordBatch) -> Result { // evaluate the arguments, if there are no arguments we'll instead pass in a null array // indicating the batch size (as a convention) - let inputs = match ( - self.args.is_empty(), - self.name.parse::(), - ) { - // MakeArray support zero argument but has the different behavior from the array with one null. - (true, Ok(scalar_fun)) - if scalar_fun - .signature() - .type_signature - .supports_zero_argument() => - { - vec![ColumnarValue::create_null_array(batch.num_rows())] - } + let inputs = match self.args.is_empty() { // If the function supports zero argument, we pass in a null array indicating the batch size. // This is for user-defined functions. - (true, Err(_)) - if self.supports_zero_argument && self.name != "make_array" => - { + // MakeArray support zero argument but has the different behavior from the array with one null. + true if self.supports_zero_argument && self.name != "make_array" => { vec![ColumnarValue::create_null_array(batch.num_rows())] } _ => self @@ -173,10 +160,6 @@ impl PhysicalExpr for ScalarFunctionExpr { // evaluate the function match self.fun { - ScalarFunctionDefinition::BuiltIn(ref fun) => { - let fun = create_physical_fun(fun)?; - (fun)(&inputs) - } ScalarFunctionDefinition::UDF(ref fun) => fun.invoke(&inputs), ScalarFunctionDefinition::Name(_) => { internal_err!( diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 13709bf394bf..7aa287055818 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -604,7 +604,7 @@ enum ScalarFunction { // 60 was Translate // Trim = 61; // Upper = 62; - Coalesce = 63; + // 63 was Coalesce // 64 was Power // 65 was StructFun // 66 was FromUnixtime diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 3a2be9907354..29724fa9cf66 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -22792,7 +22792,6 @@ impl serde::Serialize for ScalarFunction { { let variant = match self { Self::Unknown => "unknown", - Self::Coalesce => "Coalesce", }; serializer.serialize_str(variant) } @@ -22805,7 +22804,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { { const FIELDS: &[&str] = &[ "unknown", - "Coalesce", ]; struct GeneratedVisitor; @@ -22847,7 +22845,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { { match value { "unknown" => Ok(ScalarFunction::Unknown), - "Coalesce" => Ok(ScalarFunction::Coalesce), _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 487cfe01fba5..400a7bf75765 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -2841,7 +2841,7 @@ impl JoinConstraint { pub enum ScalarFunction { /// 0 was Abs before /// The first enum value must be zero for open enums - Unknown = 0, + /// /// 1 was Acos /// 2 was Asin /// 3 was Atan @@ -2904,7 +2904,7 @@ pub enum ScalarFunction { /// 60 was Translate /// Trim = 61; /// Upper = 62; - /// + /// 63 was Coalesce /// 64 was Power /// 65 was StructFun /// 66 was FromUnixtime @@ -2978,7 +2978,7 @@ pub enum ScalarFunction { /// 136 was ToChar /// 137 was ToDate /// 138 was ToUnixtime - Coalesce = 63, + Unknown = 0, } impl ScalarFunction { /// String value of the enum field names used in the ProtoBuf definition. @@ -2988,14 +2988,12 @@ impl ScalarFunction { pub fn as_str_name(&self) -> &'static str { match self { ScalarFunction::Unknown => "unknown", - ScalarFunction::Coalesce => "Coalesce", } } /// Creates an enum from field names used in the ProtoBuf definition. pub fn from_str_name(value: &str) -> ::core::option::Option { match value { "unknown" => Some(Self::Unknown), - "Coalesce" => Some(Self::Coalesce), _ => None, } } diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 4ccff9e7aa62..c0898db6f671 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -37,11 +37,10 @@ use datafusion_expr::expr::Unnest; use datafusion_expr::expr::{Alias, Placeholder}; use datafusion_expr::window_frame::{check_window_frame, regularize_window_order_by}; use datafusion_expr::{ - coalesce, expr::{self, InList, Sort, WindowFunction}, logical_plan::{PlanType, StringifiedPlan}, - AggregateFunction, Between, BinaryExpr, BuiltInWindowFunction, BuiltinScalarFunction, - Case, Cast, Expr, GetFieldAccess, GetIndexedField, GroupingSet, + AggregateFunction, Between, BinaryExpr, BuiltInWindowFunction, Case, Cast, Expr, + GetFieldAccess, GetIndexedField, GroupingSet, GroupingSet::GroupingSets, JoinConstraint, JoinType, Like, Operator, TryCast, WindowFrame, WindowFrameBound, WindowFrameUnits, @@ -412,16 +411,6 @@ impl From<&protobuf::StringifiedPlan> for StringifiedPlan { } } -impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction { - fn from(f: &protobuf::ScalarFunction) -> Self { - use protobuf::ScalarFunction; - match f { - ScalarFunction::Unknown => todo!(), - ScalarFunction::Coalesce => Self::Coalesce, - } - } -} - impl From for AggregateFunction { fn from(agg_fun: protobuf::AggregateFunction) -> Self { match agg_fun { @@ -1278,13 +1267,9 @@ pub fn parse_expr( ExprType::ScalarFunction(expr) => { let scalar_function = protobuf::ScalarFunction::try_from(expr.fun) .map_err(|_| Error::unknown("ScalarFunction", expr.fun))?; - let args = &expr.args; match scalar_function { ScalarFunction::Unknown => Err(proto_error("Unknown scalar function")), - ScalarFunction::Coalesce => { - Ok(coalesce(parse_exprs(args, registry, codec)?)) - } } } ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode { diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 7ad39df2c7ed..812a8bb85f87 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -21,20 +21,6 @@ use std::sync::Arc; -use crate::protobuf::{ - self, - arrow_type::ArrowTypeEnum, - plan_type::PlanTypeEnum::{ - AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan, - FinalPhysicalPlan, FinalPhysicalPlanWithStats, InitialLogicalPlan, - InitialPhysicalPlan, InitialPhysicalPlanWithStats, OptimizedLogicalPlan, - OptimizedPhysicalPlan, - }, - AnalyzedLogicalPlanType, CubeNode, EmptyMessage, GroupingSetNode, LogicalExprList, - OptimizedLogicalPlanType, OptimizedPhysicalPlanType, PlaceholderNode, RollupNode, - UnionField, UnionValue, -}; - use arrow::{ array::ArrayRef, datatypes::{ @@ -44,6 +30,7 @@ use arrow::{ ipc::writer::{DictionaryTracker, IpcDataGenerator}, record_batch::RecordBatch, }; + use datafusion_common::{ Column, Constraint, Constraints, DFSchema, DFSchemaRef, ScalarValue, TableReference, }; @@ -54,8 +41,22 @@ use datafusion_expr::expr::{ }; use datafusion_expr::{ logical_plan::PlanType, logical_plan::StringifiedPlan, AggregateFunction, - BuiltInWindowFunction, BuiltinScalarFunction, Expr, JoinConstraint, JoinType, - TryCast, WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, + BuiltInWindowFunction, Expr, JoinConstraint, JoinType, TryCast, WindowFrame, + WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, +}; + +use crate::protobuf::{ + self, + arrow_type::ArrowTypeEnum, + plan_type::PlanTypeEnum::{ + AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan, + FinalPhysicalPlan, FinalPhysicalPlanWithStats, InitialLogicalPlan, + InitialPhysicalPlan, InitialPhysicalPlanWithStats, OptimizedLogicalPlan, + OptimizedPhysicalPlan, + }, + AnalyzedLogicalPlanType, CubeNode, EmptyMessage, GroupingSetNode, LogicalExprList, + OptimizedLogicalPlanType, OptimizedPhysicalPlanType, PlaceholderNode, RollupNode, + UnionField, UnionValue, }; use super::LogicalExtensionCodec; @@ -70,8 +71,6 @@ pub enum Error { InvalidTimeUnit(TimeUnit), - UnsupportedScalarFunction(BuiltinScalarFunction), - NotImplemented(String), } @@ -93,9 +92,6 @@ impl std::fmt::Display for Error { "Only TimeUnit::Microsecond and TimeUnit::Nanosecond are valid time units, found: {time_unit:?}" ) } - Self::UnsupportedScalarFunction(function) => { - write!(f, "Unsupported scalar function {function:?}") - } Self::NotImplemented(s) => { write!(f, "Not implemented: {s}") } @@ -774,17 +770,6 @@ pub fn serialize_expr( Expr::ScalarFunction(ScalarFunction { func_def, args }) => { let args = serialize_exprs(args, codec)?; match func_def { - ScalarFunctionDefinition::BuiltIn(fun) => { - let fun: protobuf::ScalarFunction = fun.try_into()?; - protobuf::LogicalExprNode { - expr_type: Some(ExprType::ScalarFunction( - protobuf::ScalarFunctionNode { - fun: fun.into(), - args, - }, - )), - } - } ScalarFunctionDefinition::UDF(fun) => { let mut buf = Vec::new(); let _ = codec.try_encode_udf(fun.as_ref(), &mut buf); @@ -1402,18 +1387,6 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { } } -impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction { - type Error = Error; - - fn try_from(scalar: &BuiltinScalarFunction) -> Result { - let scalar_function = match scalar { - BuiltinScalarFunction::Coalesce => Self::Coalesce, - }; - - Ok(scalar_function) - } -} - impl From<&TimeUnit> for protobuf::TimeUnit { fn from(val: &TimeUnit) -> Self { match val { diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index ffc165e725b0..12a3288a76bb 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -34,7 +34,6 @@ use datafusion::datasource::file_format::parquet::ParquetSink; use datafusion::datasource::listing::{FileRange, ListingTableUrl, PartitionedFile}; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{FileScanConfig, FileSinkConfig}; -use datafusion::execution::context::ExecutionProps; use datafusion::execution::FunctionRegistry; use datafusion::logical_expr::WindowFunctionDefinition; use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr}; @@ -44,7 +43,7 @@ use datafusion::physical_plan::expressions::{ }; use datafusion::physical_plan::windows::create_window_expr; use datafusion::physical_plan::{ - functions, ColumnStatistics, Partitioning, PhysicalExpr, Statistics, WindowExpr, + ColumnStatistics, Partitioning, PhysicalExpr, Statistics, WindowExpr, }; use datafusion_common::config::{ ColumnOptions, CsvOptions, FormatOptions, JsonOptions, ParquetOptions, @@ -340,24 +339,10 @@ pub fn parse_physical_expr( convert_required!(e.arrow_type)?, )), ExprType::ScalarFunction(e) => { - let scalar_function = - protobuf::ScalarFunction::try_from(e.fun).map_err(|_| { - proto_error( - format!("Received an unknown scalar function: {}", e.fun,), - ) - })?; - - let args = parse_physical_exprs(&e.args, registry, input_schema, codec)?; - - // TODO Do not create new the ExecutionProps - let execution_props = ExecutionProps::new(); - - functions::create_builtin_physical_expr( - &(&scalar_function).into(), - &args, - input_schema, - &execution_props, - )? + return Err(proto_error(format!( + "Received an unknown scalar function: {}", + e.fun, + ))); } ExprType::ScalarUdf(e) => { let udf = match &e.fun_definition { diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index b4c23e4d0c3c..7b6f745fed6a 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -18,13 +18,11 @@ use std::{ convert::{TryFrom, TryInto}, - str::FromStr, sync::Arc, }; #[cfg(feature = "parquet")] use datafusion::datasource::file_format::parquet::ParquetSink; -use datafusion::logical_expr::BuiltinScalarFunction; use datafusion::physical_expr::window::{NthValueKind, SlidingAggregateWindowExpr}; use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr}; use datafusion::physical_plan::expressions::{ @@ -545,44 +543,30 @@ pub fn serialize_physical_expr( }) } else if let Some(expr) = expr.downcast_ref::() { let args = serialize_physical_exprs(expr.args().to_vec(), codec)?; - if let Ok(fun) = BuiltinScalarFunction::from_str(expr.name()) { - let fun: protobuf::ScalarFunction = (&fun).try_into()?; - - Ok(protobuf::PhysicalExprNode { - expr_type: Some(protobuf::physical_expr_node::ExprType::ScalarFunction( - protobuf::PhysicalScalarFunctionNode { - name: expr.name().to_string(), - fun: fun.into(), - args, - return_type: Some(expr.return_type().try_into()?), - }, - )), - }) - } else { - let mut buf = Vec::new(); - match expr.fun() { - ScalarFunctionDefinition::UDF(udf) => { - codec.try_encode_udf(udf, &mut buf)?; - } - _ => { - return not_impl_err!( - "Proto serialization error: Trying to serialize a unresolved function" - ); - } - } - let fun_definition = if buf.is_empty() { None } else { Some(buf) }; - Ok(protobuf::PhysicalExprNode { - expr_type: Some(protobuf::physical_expr_node::ExprType::ScalarUdf( - protobuf::PhysicalScalarUdfNode { - name: expr.name().to_string(), - args, - fun_definition, - return_type: Some(expr.return_type().try_into()?), - }, - )), - }) + let mut buf = Vec::new(); + match expr.fun() { + ScalarFunctionDefinition::UDF(udf) => { + codec.try_encode_udf(udf, &mut buf)?; + } + _ => { + return not_impl_err!( + "Proto serialization error: Trying to serialize a unresolved function" + ); + } } + + let fun_definition = if buf.is_empty() { None } else { Some(buf) }; + Ok(protobuf::PhysicalExprNode { + expr_type: Some(protobuf::physical_expr_node::ExprType::ScalarUdf( + protobuf::PhysicalScalarUdfNode { + name: expr.name().to_string(), + args, + fun_definition, + return_type: Some(expr.return_type().try_into()?), + }, + )), + }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_type: Some(protobuf::physical_expr_node::ExprType::LikeExpr(Box::new( diff --git a/datafusion/proto/tests/cases/serialize.rs b/datafusion/proto/tests/cases/serialize.rs index 972382b841d5..cc683e778ebc 100644 --- a/datafusion/proto/tests/cases/serialize.rs +++ b/datafusion/proto/tests/cases/serialize.rs @@ -24,6 +24,7 @@ use datafusion::execution::FunctionRegistry; use datafusion::prelude::SessionContext; use datafusion_expr::{col, create_udf, lit, ColumnarValue}; use datafusion_expr::{Expr, Volatility}; +use datafusion_functions::string; use datafusion_proto::bytes::Serializeable; use datafusion_proto::logical_plan::to_proto::serialize_expr; use datafusion_proto::logical_plan::DefaultLogicalExtensionCodec; @@ -252,17 +253,15 @@ fn context_with_udf() -> SessionContext { fn test_expression_serialization_roundtrip() { use datafusion_common::ScalarValue; use datafusion_expr::expr::ScalarFunction; - use datafusion_expr::BuiltinScalarFunction; use datafusion_proto::logical_plan::from_proto::parse_expr; - use strum::IntoEnumIterator; let ctx = SessionContext::new(); let lit = Expr::Literal(ScalarValue::Utf8(None)); - for builtin_fun in BuiltinScalarFunction::iter() { + for function in string::functions() { // default to 4 args (though some exprs like substr have error checking) let num_args = 4; let args: Vec<_> = std::iter::repeat(&lit).take(num_args).cloned().collect(); - let expr = Expr::ScalarFunction(ScalarFunction::new(builtin_fun, args)); + let expr = Expr::ScalarFunction(ScalarFunction::new_udf(function, args)); let extension_codec = DefaultLogicalExtensionCodec {}; let proto = serialize_expr(&expr, &extension_codec).unwrap(); diff --git a/datafusion/sql/src/expr/function.rs b/datafusion/sql/src/expr/function.rs index c225afec58d6..68cba15634d5 100644 --- a/datafusion/sql/src/expr/function.rs +++ b/datafusion/sql/src/expr/function.rs @@ -27,7 +27,7 @@ use datafusion_expr::{ }; use datafusion_expr::{ expr::{ScalarFunction, Unnest}, - BuiltInWindowFunction, BuiltinScalarFunction, + BuiltInWindowFunction, }; use sqlparser::ast::{ Expr as SQLExpr, Function as SQLFunction, FunctionArg, FunctionArgExpr, WindowType, @@ -55,7 +55,6 @@ pub fn suggest_valid_function( // All scalar functions and aggregate functions let mut funcs = Vec::new(); - funcs.extend(BuiltinScalarFunction::iter().map(|func| func.to_string())); funcs.extend(ctx.udfs_names()); funcs.extend(AggregateFunction::iter().map(|func| func.to_string())); funcs.extend(ctx.udafs_names()); @@ -111,7 +110,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { crate::utils::normalize_ident(name.0[0].clone()) }; - // user-defined function (UDF) should have precedence in case it has the same name as a scalar built-in function + // user-defined function (UDF) should have precedence if let Some(fm) = self.context_provider.get_function_meta(&name) { let args = self.function_args_to_expr(args, schema, planner_context)?; return Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fm, args))); @@ -129,12 +128,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { return Ok(Expr::Unnest(Unnest::new(expr))); } - // next, scalar built-in - if let Ok(fun) = BuiltinScalarFunction::from_str(&name) { - let args = self.function_args_to_expr(args, schema, planner_context)?; - return Ok(Expr::ScalarFunction(ScalarFunction::new(fun, args))); - }; - if !order_by.is_empty() && is_function_window { return plan_err!( "Aggregate ORDER BY is not implemented for window functions" diff --git a/datafusion/sqllogictest/test_files/timestamps.slt b/datafusion/sqllogictest/test_files/timestamps.slt index 491b9b810687..db9393ef7e5c 100644 --- a/datafusion/sqllogictest/test_files/timestamps.slt +++ b/datafusion/sqllogictest/test_files/timestamps.slt @@ -1625,7 +1625,7 @@ drop table ts_data_secs ########## -## Timezone impact on builtin scalar functions +## Timezone impact on scalar functions # # server time = +07 ########## @@ -1690,7 +1690,7 @@ SELECT date_part('hour', TIMESTAMPTZ '2000-01-01T01:01:01Z') as part ########## -## Timezone impact on builtin scalar functions +## Timezone impact on scalar functions # # server time = UTC ########## @@ -1773,7 +1773,7 @@ SELECT date_part('hour', TIMESTAMPTZ '2000-01-01T01:01:01+07') as part ########## -## Timezone impact on builtin scalar functions +## Timezone impact on scalar functions # # irregular offsets ########## diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 73782ab27f71..fab4528c0b42 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -23,12 +23,12 @@ use datafusion::common::{ use datafusion::execution::FunctionRegistry; use datafusion::logical_expr::{ - aggregate_function, expr::find_df_window_func, BinaryExpr, BuiltinScalarFunction, - Case, Expr, LogicalPlan, Operator, + aggregate_function, expr::find_df_window_func, BinaryExpr, Case, Expr, LogicalPlan, + Operator, ScalarUDF, }; use datafusion::logical_expr::{ expr, Cast, Extension, GroupingSet, Like, LogicalPlanBuilder, Partitioning, - Repartition, ScalarUDF, Subquery, WindowFrameBound, WindowFrameUnits, + Repartition, Subquery, WindowFrameBound, WindowFrameUnits, }; use datafusion::prelude::JoinType; use datafusion::sql::TableReference; @@ -75,7 +75,6 @@ use crate::variation_const::{ }; enum ScalarFunctionType { - Builtin(BuiltinScalarFunction), Op(Operator), Expr(BuiltinExprBuilder), Udf(Arc), @@ -127,10 +126,6 @@ fn scalar_function_type_from_str( return Ok(ScalarFunctionType::Op(op)); } - if let Ok(fun) = BuiltinScalarFunction::from_str(name) { - return Ok(ScalarFunctionType::Builtin(fun)); - } - if let Some(builder) = BuiltinExprBuilder::try_from_name(name) { return Ok(ScalarFunctionType::Expr(builder)); } @@ -910,18 +905,6 @@ pub async fn from_substrait_rex( expr::ScalarFunction::new_udf(fun, args), ))) } - ScalarFunctionType::Builtin(fun) => { - let args = decode_arguments( - ctx, - input_schema, - extensions, - f.arguments.as_slice(), - ) - .await?; - Ok(Arc::new(Expr::ScalarFunction(expr::ScalarFunction::new( - fun, args, - )))) - } ScalarFunctionType::Op(op) => { if f.arguments.len() != 2 { return not_impl_err!(