Skip to content

Commit

Permalink
refactor: remove type_coercion in PhysicalExpr. (#6575)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackwener authored Jun 7, 2023
1 parent 786f222 commit 12b88ea
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 231 deletions.
4 changes: 1 addition & 3 deletions datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,9 +703,7 @@ use crate::physical_plan::common::AbortOnDropSingle;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_execution::TaskContext;
pub use datafusion_physical_expr::{
expressions, functions, hash_utils, type_coercion, udf,
};
pub use datafusion_physical_expr::{expressions, functions, hash_utils, udf};

#[cfg(test)]
mod tests {
Expand Down
35 changes: 12 additions & 23 deletions datafusion/core/src/physical_plan/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ use crate::physical_plan::{
cume_dist, dense_rank, lag, lead, percent_rank, rank, Literal, NthValue, Ntile,
PhysicalSortExpr, RowNumber,
},
type_coercion::coerce,
udaf, ExecutionPlan, PhysicalExpr,
};
use crate::scalar::ScalarValue;
use arrow::datatypes::Schema;
use arrow_schema::{SchemaRef, SortOptions};
use datafusion_expr::{
window_function::{signature_for_built_in, BuiltInWindowFunction, WindowFunction},
window_function::{BuiltInWindowFunction, WindowFunction},
WindowFrame,
};
use datafusion_physical_expr::window::{
Expand Down Expand Up @@ -133,8 +132,7 @@ fn create_built_in_window_expr(
BuiltInWindowFunction::PercentRank => Arc::new(percent_rank(name)),
BuiltInWindowFunction::CumeDist => Arc::new(cume_dist(name)),
BuiltInWindowFunction::Ntile => {
let coerced_args = coerce(args, input_schema, &signature_for_built_in(fun))?;
let n: i64 = get_scalar_value_from_args(&coerced_args, 0)?
let n: i64 = get_scalar_value_from_args(args, 0)?
.ok_or_else(|| {
DataFusionError::Execution(
"NTILE requires at least 1 argument".to_string(),
Expand All @@ -145,33 +143,26 @@ fn create_built_in_window_expr(
Arc::new(Ntile::new(name, n))
}
BuiltInWindowFunction::Lag => {
let coerced_args = coerce(args, input_schema, &signature_for_built_in(fun))?;
let arg = coerced_args[0].clone();
let arg = args[0].clone();
let data_type = args[0].data_type(input_schema)?;
let shift_offset = get_scalar_value_from_args(&coerced_args, 1)?
let shift_offset = get_scalar_value_from_args(args, 1)?
.map(|v| v.try_into())
.and_then(|v| v.ok());
let default_value = get_scalar_value_from_args(&coerced_args, 2)?;
let default_value = get_scalar_value_from_args(args, 2)?;
Arc::new(lag(name, data_type, arg, shift_offset, default_value))
}
BuiltInWindowFunction::Lead => {
let coerced_args = coerce(args, input_schema, &signature_for_built_in(fun))?;
let arg = coerced_args[0].clone();
let arg = args[0].clone();
let data_type = args[0].data_type(input_schema)?;
let shift_offset = get_scalar_value_from_args(&coerced_args, 1)?
let shift_offset = get_scalar_value_from_args(args, 1)?
.map(|v| v.try_into())
.and_then(|v| v.ok());
let default_value = get_scalar_value_from_args(&coerced_args, 2)?;
let default_value = get_scalar_value_from_args(args, 2)?;
Arc::new(lead(name, data_type, arg, shift_offset, default_value))
}
BuiltInWindowFunction::NthValue => {
let coerced_args = coerce(args, input_schema, &signature_for_built_in(fun))?;
let arg = coerced_args[0].clone();
let n = coerced_args[1]
.as_any()
.downcast_ref::<Literal>()
.unwrap()
.value();
let arg = args[0].clone();
let n = args[1].as_any().downcast_ref::<Literal>().unwrap().value();
let n: i64 = n
.clone()
.try_into()
Expand All @@ -181,14 +172,12 @@ fn create_built_in_window_expr(
Arc::new(NthValue::nth(name, arg, data_type, n)?)
}
BuiltInWindowFunction::FirstValue => {
let arg =
coerce(args, input_schema, &signature_for_built_in(fun))?[0].clone();
let arg = args[0].clone();
let data_type = args[0].data_type(input_schema)?;
Arc::new(NthValue::first(name, arg, data_type))
}
BuiltInWindowFunction::LastValue => {
let arg =
coerce(args, input_schema, &signature_for_built_in(fun))?[0].clone();
let arg = args[0].clone();
let data_type = args[0].data_type(input_schema)?;
Arc::new(NthValue::last(name, arg, data_type))
}
Expand Down
32 changes: 30 additions & 2 deletions datafusion/physical-expr/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -809,9 +809,9 @@ pub fn create_physical_fun(
#[cfg(test)]
mod tests {
use super::*;
use crate::expressions::try_cast;
use crate::expressions::{col, lit};
use crate::from_slice::FromSlice;
use crate::type_coercion::coerce;
use arrow::{
array::{
Array, ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array,
Expand All @@ -822,6 +822,8 @@ mod tests {
};
use datafusion_common::cast::as_uint64_array;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::type_coercion::functions::data_types;
use datafusion_expr::Signature;

/// $FUNC function to test
/// $ARGS arguments (vec) to pass to function
Expand Down Expand Up @@ -2885,7 +2887,33 @@ mod tests {
Ok(())
}

// Helper function
// Helper function just for testing.
// Returns `expressions` coerced to types compatible with
// `signature`, if possible.
pub fn coerce(
expressions: &[Arc<dyn PhysicalExpr>],
schema: &Schema,
signature: &Signature,
) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
if expressions.is_empty() {
return Ok(vec![]);
}

let current_types = expressions
.iter()
.map(|e| e.data_type(schema))
.collect::<Result<Vec<_>>>()?;

let new_types = data_types(&current_types, signature)?;

expressions
.iter()
.enumerate()
.map(|(i, expr)| try_cast(expr.clone(), schema, new_types[i].clone()))
.collect::<Result<Vec<_>>>()
}

// Helper function just for testing.
// The type coercion will be done in the logical phase, should do the type coercion for the test
fn create_physical_expr_with_type_coercion(
fun: &BuiltinScalarFunction,
Expand Down
1 change: 0 additions & 1 deletion datafusion/physical-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ mod sort_expr;
pub mod string_expressions;
pub mod struct_expressions;
pub mod tree_node;
pub mod type_coercion;
pub mod udf;
#[cfg(feature = "unicode_expressions")]
pub mod unicode_expressions;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/sort_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl PhysicalSortExpr {

/// Represents sort requirement associated with a plan
///
/// If the requirement incudes [`SortOptions`] then both the
/// If the requirement includes [`SortOptions`] then both the
/// expression *and* the sort options must match.
///
/// If the requirement does not include [`SortOptions`]) then only the
Expand Down
201 changes: 0 additions & 201 deletions datafusion/physical-expr/src/type_coercion.rs

This file was deleted.

0 comments on commit 12b88ea

Please sign in to comment.