From 06a5ee45bc39d5df7ccc2bf5150c50281d30fac2 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sun, 6 Feb 2022 16:17:12 +0800 Subject: [PATCH] s --- datafusion-expr/src/expr.rs | 296 +++++++++++++++++- datafusion-expr/src/field_util.rs | 69 ++++ datafusion-expr/src/lib.rs | 1 + datafusion-expr/src/window_function.rs | 78 +++++ datafusion/src/logical_plan/expr.rs | 285 ----------------- .../src/physical_plan/window_functions.rs | 77 ----- 6 files changed, 440 insertions(+), 366 deletions(-) create mode 100644 datafusion-expr/src/field_util.rs diff --git a/datafusion-expr/src/expr.rs b/datafusion-expr/src/expr.rs index 618defb00428a..0fb682cea9670 100644 --- a/datafusion-expr/src/expr.rs +++ b/datafusion-expr/src/expr.rs @@ -17,10 +17,13 @@ //! Expressions +use crate::field_util::get_indexed_field; use crate::operator::Operator; +use crate::window_frame; +use crate::window_function; use arrow::{compute::can_cast_types, datatypes::DataType}; use datafusion_common::{ - DFField, DFSchema, DataFusionError, ExprSchema, Result, ScalarValue, + Column, DFField, DFSchema, DataFusionError, ExprSchema, Result, ScalarValue, }; use std::fmt; use std::hash::{BuildHasher, Hash, Hasher}; @@ -183,7 +186,7 @@ pub enum Expr { /// Represents the call of an aggregate built-in function with arguments. AggregateFunction { /// Name of the function - fun: aggregates::AggregateFunction, + fun: aggregate::AggregateFunction, /// List of expressions to feed to the functions as arguments args: Vec, /// Whether this is a DISTINCT aggregation or not @@ -192,7 +195,7 @@ pub enum Expr { /// Represents the call of a window function with arguments. WindowFunction { /// Name of the function - fun: window_functions::WindowFunction, + fun: window_function::WindowFunction, /// List of expressions to feed to the functions as arguments args: Vec, /// List of partition by expressions @@ -200,7 +203,7 @@ pub enum Expr { /// List of order by expressions order_by: Vec, /// Window frame - window_frame: Option, + window_frame: Option, }, /// aggregate function AggregateUDF { @@ -981,3 +984,288 @@ pub fn binary_expr(l: Expr, op: Operator, r: Expr) -> Expr { right: Box::new(r), } } + +impl fmt::Debug for Expr { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Expr::Alias(expr, alias) => write!(f, "{:?} AS {}", expr, alias), + Expr::Column(c) => write!(f, "{}", c), + Expr::ScalarVariable(var_names) => write!(f, "{}", var_names.join(".")), + Expr::Literal(v) => write!(f, "{:?}", v), + Expr::Case { + expr, + when_then_expr, + else_expr, + .. + } => { + write!(f, "CASE ")?; + if let Some(e) = expr { + write!(f, "{:?} ", e)?; + } + for (w, t) in when_then_expr { + write!(f, "WHEN {:?} THEN {:?} ", w, t)?; + } + if let Some(e) = else_expr { + write!(f, "ELSE {:?} ", e)?; + } + write!(f, "END") + } + Expr::Cast { expr, data_type } => { + write!(f, "CAST({:?} AS {:?})", expr, data_type) + } + Expr::TryCast { expr, data_type } => { + write!(f, "TRY_CAST({:?} AS {:?})", expr, data_type) + } + Expr::Not(expr) => write!(f, "NOT {:?}", expr), + Expr::Negative(expr) => write!(f, "(- {:?})", expr), + Expr::IsNull(expr) => write!(f, "{:?} IS NULL", expr), + Expr::IsNotNull(expr) => write!(f, "{:?} IS NOT NULL", expr), + Expr::BinaryExpr { left, op, right } => { + write!(f, "{:?} {} {:?}", left, op, right) + } + Expr::Sort { + expr, + asc, + nulls_first, + } => { + if *asc { + write!(f, "{:?} ASC", expr)?; + } else { + write!(f, "{:?} DESC", expr)?; + } + if *nulls_first { + write!(f, " NULLS FIRST") + } else { + write!(f, " NULLS LAST") + } + } + Expr::ScalarFunction { fun, args, .. } => { + fmt_function(f, &fun.to_string(), false, args, false) + } + Expr::ScalarUDF { fun, ref args, .. } => { + fmt_function(f, &fun.name, false, args, false) + } + Expr::WindowFunction { + fun, + args, + partition_by, + order_by, + window_frame, + } => { + fmt_function(f, &fun.to_string(), false, args, false)?; + if !partition_by.is_empty() { + write!(f, " PARTITION BY {:?}", partition_by)?; + } + if !order_by.is_empty() { + write!(f, " ORDER BY {:?}", order_by)?; + } + if let Some(window_frame) = window_frame { + write!( + f, + " {} BETWEEN {} AND {}", + window_frame.units, + window_frame.start_bound, + window_frame.end_bound + )?; + } + Ok(()) + } + Expr::AggregateFunction { + fun, + distinct, + ref args, + .. + } => fmt_function(f, &fun.to_string(), *distinct, args, true), + Expr::AggregateUDF { fun, ref args, .. } => { + fmt_function(f, &fun.name, false, args, false) + } + Expr::Between { + expr, + negated, + low, + high, + } => { + if *negated { + write!(f, "{:?} NOT BETWEEN {:?} AND {:?}", expr, low, high) + } else { + write!(f, "{:?} BETWEEN {:?} AND {:?}", expr, low, high) + } + } + Expr::InList { + expr, + list, + negated, + } => { + if *negated { + write!(f, "{:?} NOT IN ({:?})", expr, list) + } else { + write!(f, "{:?} IN ({:?})", expr, list) + } + } + Expr::Wildcard => write!(f, "*"), + Expr::GetIndexedField { ref expr, key } => { + write!(f, "({:?})[{}]", expr, key) + } + } + } +} + +/// Returns a readable name of an expression based on the input schema. +/// This function recursively transverses the expression for names such as "CAST(a > 2)". +fn create_name(e: &Expr, input_schema: &DFSchema) -> Result { + match e { + Expr::Alias(_, name) => Ok(name.clone()), + Expr::Column(c) => Ok(c.flat_name()), + Expr::ScalarVariable(variable_names) => Ok(variable_names.join(".")), + Expr::Literal(value) => Ok(format!("{:?}", value)), + Expr::BinaryExpr { left, op, right } => { + let left = create_name(left, input_schema)?; + let right = create_name(right, input_schema)?; + Ok(format!("{} {} {}", left, op, right)) + } + Expr::Case { + expr, + when_then_expr, + else_expr, + } => { + let mut name = "CASE ".to_string(); + if let Some(e) = expr { + let e = create_name(e, input_schema)?; + name += &format!("{} ", e); + } + for (w, t) in when_then_expr { + let when = create_name(w, input_schema)?; + let then = create_name(t, input_schema)?; + name += &format!("WHEN {} THEN {} ", when, then); + } + if let Some(e) = else_expr { + let e = create_name(e, input_schema)?; + name += &format!("ELSE {} ", e); + } + name += "END"; + Ok(name) + } + Expr::Cast { expr, data_type } => { + let expr = create_name(expr, input_schema)?; + Ok(format!("CAST({} AS {:?})", expr, data_type)) + } + Expr::TryCast { expr, data_type } => { + let expr = create_name(expr, input_schema)?; + Ok(format!("TRY_CAST({} AS {:?})", expr, data_type)) + } + Expr::Not(expr) => { + let expr = create_name(expr, input_schema)?; + Ok(format!("NOT {}", expr)) + } + Expr::Negative(expr) => { + let expr = create_name(expr, input_schema)?; + Ok(format!("(- {})", expr)) + } + Expr::IsNull(expr) => { + let expr = create_name(expr, input_schema)?; + Ok(format!("{} IS NULL", expr)) + } + Expr::IsNotNull(expr) => { + let expr = create_name(expr, input_schema)?; + Ok(format!("{} IS NOT NULL", expr)) + } + Expr::GetIndexedField { expr, key } => { + let expr = create_name(expr, input_schema)?; + Ok(format!("{}[{}]", expr, key)) + } + Expr::ScalarFunction { fun, args, .. } => { + create_function_name(&fun.to_string(), false, args, input_schema) + } + Expr::ScalarUDF { fun, args, .. } => { + create_function_name(&fun.name, false, args, input_schema) + } + Expr::WindowFunction { + fun, + args, + window_frame, + partition_by, + order_by, + } => { + let mut parts: Vec = vec![create_function_name( + &fun.to_string(), + false, + args, + input_schema, + )?]; + if !partition_by.is_empty() { + parts.push(format!("PARTITION BY {:?}", partition_by)); + } + if !order_by.is_empty() { + parts.push(format!("ORDER BY {:?}", order_by)); + } + if let Some(window_frame) = window_frame { + parts.push(format!("{}", window_frame)); + } + Ok(parts.join(" ")) + } + Expr::AggregateFunction { + fun, + distinct, + args, + .. + } => create_function_name(&fun.to_string(), *distinct, args, input_schema), + Expr::AggregateUDF { fun, args } => { + let mut names = Vec::with_capacity(args.len()); + for e in args { + names.push(create_name(e, input_schema)?); + } + Ok(format!("{}({})", fun.name, names.join(","))) + } + Expr::InList { + expr, + list, + negated, + } => { + let expr = create_name(expr, input_schema)?; + let list = list.iter().map(|expr| create_name(expr, input_schema)); + if *negated { + Ok(format!("{} NOT IN ({:?})", expr, list)) + } else { + Ok(format!("{} IN ({:?})", expr, list)) + } + } + Expr::Between { + expr, + negated, + low, + high, + } => { + let expr = create_name(expr, input_schema)?; + let low = create_name(low, input_schema)?; + let high = create_name(high, input_schema)?; + if *negated { + Ok(format!("{} NOT BETWEEN {} AND {}", expr, low, high)) + } else { + Ok(format!("{} BETWEEN {} AND {}", expr, low, high)) + } + } + Expr::Sort { .. } => Err(DataFusionError::Internal( + "Create name does not support sort expression".to_string(), + )), + Expr::Wildcard => Err(DataFusionError::Internal( + "Create name does not support wildcard".to_string(), + )), + } +} + +fn create_function_name( + fun: &str, + distinct: bool, + args: &[Expr], + input_schema: &DFSchema, +) -> Result { + let names: Vec = args + .iter() + .map(|e| create_name(e, input_schema)) + .collect::>()?; + let distinct_str = match distinct { + true => "DISTINCT ", + false => "", + }; + Ok(format!("{}({}{})", fun, distinct_str, names.join(","))) +} diff --git a/datafusion-expr/src/field_util.rs b/datafusion-expr/src/field_util.rs new file mode 100644 index 0000000000000..272c17b60887b --- /dev/null +++ b/datafusion-expr/src/field_util.rs @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utility functions for complex field access + +use arrow::datatypes::{DataType, Field}; + +use crate::error::{DataFusionError, Result}; +use crate::scalar::ScalarValue; + +/// Returns the field access indexed by `key` from a [`DataType::List`] or [`DataType::Struct`] +/// # Error +/// Errors if +/// * the `data_type` is not a Struct or, +/// * there is no field key is not of the required index type +pub fn get_indexed_field(data_type: &DataType, key: &ScalarValue) -> Result { + match (data_type, key) { + (DataType::List(lt), ScalarValue::Int64(Some(i))) => { + if *i < 0 { + Err(DataFusionError::Plan(format!( + "List based indexed access requires a positive int, was {0}", + i + ))) + } else { + Ok(Field::new(&i.to_string(), lt.data_type().clone(), false)) + } + } + (DataType::Struct(fields), ScalarValue::Utf8(Some(s))) => { + if s.is_empty() { + Err(DataFusionError::Plan( + "Struct based indexed access requires a non empty string".to_string(), + )) + } else { + let field = fields.iter().find(|f| f.name() == s); + match field { + None => Err(DataFusionError::Plan(format!( + "Field {} not found in struct", + s + ))), + Some(f) => Ok(f.clone()), + } + } + } + (DataType::Struct(_), _) => Err(DataFusionError::Plan( + "Only utf8 strings are valid as an indexed field in a struct".to_string(), + )), + (DataType::List(_), _) => Err(DataFusionError::Plan( + "Only ints are valid as an indexed field in a list".to_string(), + )), + _ => Err(DataFusionError::Plan( + "The expression to get an indexed field is only valid for `List` types" + .to_string(), + )), + } +} diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/lib.rs index 3561482cf3be2..924bd4ce36a66 100644 --- a/datafusion-expr/src/lib.rs +++ b/datafusion-expr/src/lib.rs @@ -17,6 +17,7 @@ mod aggregate_function; mod expr; +mod field_util; mod operator; mod window_function; diff --git a/datafusion-expr/src/window_function.rs b/datafusion-expr/src/window_function.rs index 59523d6540b20..b57c5fb41def7 100644 --- a/datafusion-expr/src/window_function.rs +++ b/datafusion-expr/src/window_function.rs @@ -16,6 +16,7 @@ // under the License. use crate::aggregate_function::AggregateFunction; +use arrow::datatypes::DataType; use datafusion_common::{DataFusionError, Result}; use std::{fmt, str::FromStr}; @@ -132,6 +133,83 @@ impl FromStr for BuiltInWindowFunction { } } +/// Returns the datatype of the window function +pub fn return_type( + fun: &WindowFunction, + input_expr_types: &[DataType], +) -> Result { + match fun { + WindowFunction::AggregateFunction(fun) => { + aggregates::return_type(fun, input_expr_types) + } + WindowFunction::BuiltInWindowFunction(fun) => { + return_type_for_built_in(fun, input_expr_types) + } + } +} + +/// Returns the datatype of the built-in window function +pub(super) fn return_type_for_built_in( + fun: &BuiltInWindowFunction, + input_expr_types: &[DataType], +) -> Result { + // Note that this function *must* return the same type that the respective physical expression returns + // or the execution panics. + + // verify that this is a valid set of data types for this function + data_types(input_expr_types, &signature_for_built_in(fun))?; + + match fun { + BuiltInWindowFunction::RowNumber + | BuiltInWindowFunction::Rank + | BuiltInWindowFunction::DenseRank => Ok(DataType::UInt64), + BuiltInWindowFunction::PercentRank | BuiltInWindowFunction::CumeDist => { + Ok(DataType::Float64) + } + BuiltInWindowFunction::Ntile => Ok(DataType::UInt32), + BuiltInWindowFunction::Lag + | BuiltInWindowFunction::Lead + | BuiltInWindowFunction::FirstValue + | BuiltInWindowFunction::LastValue + | BuiltInWindowFunction::NthValue => Ok(input_expr_types[0].clone()), + } +} + +/// the signatures supported by the function `fun`. +pub fn signature(fun: &WindowFunction) -> Signature { + match fun { + WindowFunction::AggregateFunction(fun) => aggregates::signature(fun), + WindowFunction::BuiltInWindowFunction(fun) => signature_for_built_in(fun), + } +} + +/// the signatures supported by the built-in window function `fun`. +pub(super) fn signature_for_built_in(fun: &BuiltInWindowFunction) -> Signature { + // note: the physical expression must accept the type returned by this function or the execution panics. + match fun { + BuiltInWindowFunction::RowNumber + | BuiltInWindowFunction::Rank + | BuiltInWindowFunction::DenseRank + | BuiltInWindowFunction::PercentRank + | BuiltInWindowFunction::CumeDist => Signature::any(0, Volatility::Immutable), + BuiltInWindowFunction::Lag | BuiltInWindowFunction::Lead => Signature::one_of( + vec![ + TypeSignature::Any(1), + TypeSignature::Any(2), + TypeSignature::Any(3), + ], + Volatility::Immutable, + ), + BuiltInWindowFunction::FirstValue | BuiltInWindowFunction::LastValue => { + Signature::any(1, Volatility::Immutable) + } + BuiltInWindowFunction::Ntile => { + Signature::exact(vec![DataType::UInt64], Volatility::Immutable) + } + BuiltInWindowFunction::NthValue => Signature::any(2, Volatility::Immutable), + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs index 94a35ae53ba0c..79ce6dc7ebe6c 100644 --- a/datafusion/src/logical_plan/expr.rs +++ b/datafusion/src/logical_plan/expr.rs @@ -765,291 +765,6 @@ pub fn create_udaf( ) } -impl fmt::Debug for Expr { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - Expr::Alias(expr, alias) => write!(f, "{:?} AS {}", expr, alias), - Expr::Column(c) => write!(f, "{}", c), - Expr::ScalarVariable(var_names) => write!(f, "{}", var_names.join(".")), - Expr::Literal(v) => write!(f, "{:?}", v), - Expr::Case { - expr, - when_then_expr, - else_expr, - .. - } => { - write!(f, "CASE ")?; - if let Some(e) = expr { - write!(f, "{:?} ", e)?; - } - for (w, t) in when_then_expr { - write!(f, "WHEN {:?} THEN {:?} ", w, t)?; - } - if let Some(e) = else_expr { - write!(f, "ELSE {:?} ", e)?; - } - write!(f, "END") - } - Expr::Cast { expr, data_type } => { - write!(f, "CAST({:?} AS {:?})", expr, data_type) - } - Expr::TryCast { expr, data_type } => { - write!(f, "TRY_CAST({:?} AS {:?})", expr, data_type) - } - Expr::Not(expr) => write!(f, "NOT {:?}", expr), - Expr::Negative(expr) => write!(f, "(- {:?})", expr), - Expr::IsNull(expr) => write!(f, "{:?} IS NULL", expr), - Expr::IsNotNull(expr) => write!(f, "{:?} IS NOT NULL", expr), - Expr::BinaryExpr { left, op, right } => { - write!(f, "{:?} {} {:?}", left, op, right) - } - Expr::Sort { - expr, - asc, - nulls_first, - } => { - if *asc { - write!(f, "{:?} ASC", expr)?; - } else { - write!(f, "{:?} DESC", expr)?; - } - if *nulls_first { - write!(f, " NULLS FIRST") - } else { - write!(f, " NULLS LAST") - } - } - Expr::ScalarFunction { fun, args, .. } => { - fmt_function(f, &fun.to_string(), false, args, false) - } - Expr::ScalarUDF { fun, ref args, .. } => { - fmt_function(f, &fun.name, false, args, false) - } - Expr::WindowFunction { - fun, - args, - partition_by, - order_by, - window_frame, - } => { - fmt_function(f, &fun.to_string(), false, args, false)?; - if !partition_by.is_empty() { - write!(f, " PARTITION BY {:?}", partition_by)?; - } - if !order_by.is_empty() { - write!(f, " ORDER BY {:?}", order_by)?; - } - if let Some(window_frame) = window_frame { - write!( - f, - " {} BETWEEN {} AND {}", - window_frame.units, - window_frame.start_bound, - window_frame.end_bound - )?; - } - Ok(()) - } - Expr::AggregateFunction { - fun, - distinct, - ref args, - .. - } => fmt_function(f, &fun.to_string(), *distinct, args, true), - Expr::AggregateUDF { fun, ref args, .. } => { - fmt_function(f, &fun.name, false, args, false) - } - Expr::Between { - expr, - negated, - low, - high, - } => { - if *negated { - write!(f, "{:?} NOT BETWEEN {:?} AND {:?}", expr, low, high) - } else { - write!(f, "{:?} BETWEEN {:?} AND {:?}", expr, low, high) - } - } - Expr::InList { - expr, - list, - negated, - } => { - if *negated { - write!(f, "{:?} NOT IN ({:?})", expr, list) - } else { - write!(f, "{:?} IN ({:?})", expr, list) - } - } - Expr::Wildcard => write!(f, "*"), - Expr::GetIndexedField { ref expr, key } => { - write!(f, "({:?})[{}]", expr, key) - } - } - } -} - -fn create_function_name( - fun: &str, - distinct: bool, - args: &[Expr], - input_schema: &DFSchema, -) -> Result { - let names: Vec = args - .iter() - .map(|e| create_name(e, input_schema)) - .collect::>()?; - let distinct_str = match distinct { - true => "DISTINCT ", - false => "", - }; - Ok(format!("{}({}{})", fun, distinct_str, names.join(","))) -} - -/// Returns a readable name of an expression based on the input schema. -/// This function recursively transverses the expression for names such as "CAST(a > 2)". -fn create_name(e: &Expr, input_schema: &DFSchema) -> Result { - match e { - Expr::Alias(_, name) => Ok(name.clone()), - Expr::Column(c) => Ok(c.flat_name()), - Expr::ScalarVariable(variable_names) => Ok(variable_names.join(".")), - Expr::Literal(value) => Ok(format!("{:?}", value)), - Expr::BinaryExpr { left, op, right } => { - let left = create_name(left, input_schema)?; - let right = create_name(right, input_schema)?; - Ok(format!("{} {} {}", left, op, right)) - } - Expr::Case { - expr, - when_then_expr, - else_expr, - } => { - let mut name = "CASE ".to_string(); - if let Some(e) = expr { - let e = create_name(e, input_schema)?; - name += &format!("{} ", e); - } - for (w, t) in when_then_expr { - let when = create_name(w, input_schema)?; - let then = create_name(t, input_schema)?; - name += &format!("WHEN {} THEN {} ", when, then); - } - if let Some(e) = else_expr { - let e = create_name(e, input_schema)?; - name += &format!("ELSE {} ", e); - } - name += "END"; - Ok(name) - } - Expr::Cast { expr, data_type } => { - let expr = create_name(expr, input_schema)?; - Ok(format!("CAST({} AS {:?})", expr, data_type)) - } - Expr::TryCast { expr, data_type } => { - let expr = create_name(expr, input_schema)?; - Ok(format!("TRY_CAST({} AS {:?})", expr, data_type)) - } - Expr::Not(expr) => { - let expr = create_name(expr, input_schema)?; - Ok(format!("NOT {}", expr)) - } - Expr::Negative(expr) => { - let expr = create_name(expr, input_schema)?; - Ok(format!("(- {})", expr)) - } - Expr::IsNull(expr) => { - let expr = create_name(expr, input_schema)?; - Ok(format!("{} IS NULL", expr)) - } - Expr::IsNotNull(expr) => { - let expr = create_name(expr, input_schema)?; - Ok(format!("{} IS NOT NULL", expr)) - } - Expr::GetIndexedField { expr, key } => { - let expr = create_name(expr, input_schema)?; - Ok(format!("{}[{}]", expr, key)) - } - Expr::ScalarFunction { fun, args, .. } => { - create_function_name(&fun.to_string(), false, args, input_schema) - } - Expr::ScalarUDF { fun, args, .. } => { - create_function_name(&fun.name, false, args, input_schema) - } - Expr::WindowFunction { - fun, - args, - window_frame, - partition_by, - order_by, - } => { - let mut parts: Vec = vec![create_function_name( - &fun.to_string(), - false, - args, - input_schema, - )?]; - if !partition_by.is_empty() { - parts.push(format!("PARTITION BY {:?}", partition_by)); - } - if !order_by.is_empty() { - parts.push(format!("ORDER BY {:?}", order_by)); - } - if let Some(window_frame) = window_frame { - parts.push(format!("{}", window_frame)); - } - Ok(parts.join(" ")) - } - Expr::AggregateFunction { - fun, - distinct, - args, - .. - } => create_function_name(&fun.to_string(), *distinct, args, input_schema), - Expr::AggregateUDF { fun, args } => { - let mut names = Vec::with_capacity(args.len()); - for e in args { - names.push(create_name(e, input_schema)?); - } - Ok(format!("{}({})", fun.name, names.join(","))) - } - Expr::InList { - expr, - list, - negated, - } => { - let expr = create_name(expr, input_schema)?; - let list = list.iter().map(|expr| create_name(expr, input_schema)); - if *negated { - Ok(format!("{} NOT IN ({:?})", expr, list)) - } else { - Ok(format!("{} IN ({:?})", expr, list)) - } - } - Expr::Between { - expr, - negated, - low, - high, - } => { - let expr = create_name(expr, input_schema)?; - let low = create_name(low, input_schema)?; - let high = create_name(high, input_schema)?; - if *negated { - Ok(format!("{} NOT BETWEEN {} AND {}", expr, low, high)) - } else { - Ok(format!("{} BETWEEN {} AND {}", expr, low, high)) - } - } - Expr::Sort { .. } => Err(DataFusionError::Internal( - "Create name does not support sort expression".to_string(), - )), - Expr::Wildcard => Err(DataFusionError::Internal( - "Create name does not support wildcard".to_string(), - )), - } -} - /// Create field meta-data from an expression, for use in a result set schema pub fn exprlist_to_fields<'a>( expr: impl IntoIterator, diff --git a/datafusion/src/physical_plan/window_functions.rs b/datafusion/src/physical_plan/window_functions.rs index b8cc96a50490e..f81453e047663 100644 --- a/datafusion/src/physical_plan/window_functions.rs +++ b/datafusion/src/physical_plan/window_functions.rs @@ -35,83 +35,6 @@ use std::any::Any; use std::ops::Range; use std::sync::Arc; -/// Returns the datatype of the window function -pub fn return_type( - fun: &WindowFunction, - input_expr_types: &[DataType], -) -> Result { - match fun { - WindowFunction::AggregateFunction(fun) => { - aggregates::return_type(fun, input_expr_types) - } - WindowFunction::BuiltInWindowFunction(fun) => { - return_type_for_built_in(fun, input_expr_types) - } - } -} - -/// Returns the datatype of the built-in window function -pub(super) fn return_type_for_built_in( - fun: &BuiltInWindowFunction, - input_expr_types: &[DataType], -) -> Result { - // Note that this function *must* return the same type that the respective physical expression returns - // or the execution panics. - - // verify that this is a valid set of data types for this function - data_types(input_expr_types, &signature_for_built_in(fun))?; - - match fun { - BuiltInWindowFunction::RowNumber - | BuiltInWindowFunction::Rank - | BuiltInWindowFunction::DenseRank => Ok(DataType::UInt64), - BuiltInWindowFunction::PercentRank | BuiltInWindowFunction::CumeDist => { - Ok(DataType::Float64) - } - BuiltInWindowFunction::Ntile => Ok(DataType::UInt32), - BuiltInWindowFunction::Lag - | BuiltInWindowFunction::Lead - | BuiltInWindowFunction::FirstValue - | BuiltInWindowFunction::LastValue - | BuiltInWindowFunction::NthValue => Ok(input_expr_types[0].clone()), - } -} - -/// the signatures supported by the function `fun`. -pub fn signature(fun: &WindowFunction) -> Signature { - match fun { - WindowFunction::AggregateFunction(fun) => aggregates::signature(fun), - WindowFunction::BuiltInWindowFunction(fun) => signature_for_built_in(fun), - } -} - -/// the signatures supported by the built-in window function `fun`. -pub(super) fn signature_for_built_in(fun: &BuiltInWindowFunction) -> Signature { - // note: the physical expression must accept the type returned by this function or the execution panics. - match fun { - BuiltInWindowFunction::RowNumber - | BuiltInWindowFunction::Rank - | BuiltInWindowFunction::DenseRank - | BuiltInWindowFunction::PercentRank - | BuiltInWindowFunction::CumeDist => Signature::any(0, Volatility::Immutable), - BuiltInWindowFunction::Lag | BuiltInWindowFunction::Lead => Signature::one_of( - vec![ - TypeSignature::Any(1), - TypeSignature::Any(2), - TypeSignature::Any(3), - ], - Volatility::Immutable, - ), - BuiltInWindowFunction::FirstValue | BuiltInWindowFunction::LastValue => { - Signature::any(1, Volatility::Immutable) - } - BuiltInWindowFunction::Ntile => { - Signature::exact(vec![DataType::UInt64], Volatility::Immutable) - } - BuiltInWindowFunction::NthValue => Signature::any(2, Volatility::Immutable), - } -} - /// Partition evaluator pub(crate) trait PartitionEvaluator { /// Whether the evaluator should be evaluated with rank