Skip to content

Commit

Permalink
s
Browse files Browse the repository at this point in the history
  • Loading branch information
jimexist committed Feb 6, 2022
1 parent 1f12ba0 commit 06a5ee4
Show file tree
Hide file tree
Showing 6 changed files with 440 additions and 366 deletions.
296 changes: 292 additions & 4 deletions datafusion-expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

//! Expressions
use crate::field_util::get_indexed_field;
use crate::operator::Operator;
use crate::window_frame;
use crate::window_function;
use arrow::{compute::can_cast_types, datatypes::DataType};
use datafusion_common::{
DFField, DFSchema, DataFusionError, ExprSchema, Result, ScalarValue,
Column, DFField, DFSchema, DataFusionError, ExprSchema, Result, ScalarValue,
};
use std::fmt;
use std::hash::{BuildHasher, Hash, Hasher};
Expand Down Expand Up @@ -183,7 +186,7 @@ pub enum Expr {
/// Represents the call of an aggregate built-in function with arguments.
AggregateFunction {
/// Name of the function
fun: aggregates::AggregateFunction,
fun: aggregate::AggregateFunction,
/// List of expressions to feed to the functions as arguments
args: Vec<Expr>,
/// Whether this is a DISTINCT aggregation or not
Expand All @@ -192,15 +195,15 @@ pub enum Expr {
/// Represents the call of a window function with arguments.
WindowFunction {
/// Name of the function
fun: window_functions::WindowFunction,
fun: window_function::WindowFunction,
/// List of expressions to feed to the functions as arguments
args: Vec<Expr>,
/// List of partition by expressions
partition_by: Vec<Expr>,
/// List of order by expressions
order_by: Vec<Expr>,
/// Window frame
window_frame: Option<window_frames::WindowFrame>,
window_frame: Option<window_frame::WindowFrame>,
},
/// aggregate function
AggregateUDF {
Expand Down Expand Up @@ -981,3 +984,288 @@ pub fn binary_expr(l: Expr, op: Operator, r: Expr) -> Expr {
right: Box::new(r),
}
}

impl fmt::Debug for Expr {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Expr::Alias(expr, alias) => write!(f, "{:?} AS {}", expr, alias),
Expr::Column(c) => write!(f, "{}", c),
Expr::ScalarVariable(var_names) => write!(f, "{}", var_names.join(".")),
Expr::Literal(v) => write!(f, "{:?}", v),
Expr::Case {
expr,
when_then_expr,
else_expr,
..
} => {
write!(f, "CASE ")?;
if let Some(e) = expr {
write!(f, "{:?} ", e)?;
}
for (w, t) in when_then_expr {
write!(f, "WHEN {:?} THEN {:?} ", w, t)?;
}
if let Some(e) = else_expr {
write!(f, "ELSE {:?} ", e)?;
}
write!(f, "END")
}
Expr::Cast { expr, data_type } => {
write!(f, "CAST({:?} AS {:?})", expr, data_type)
}
Expr::TryCast { expr, data_type } => {
write!(f, "TRY_CAST({:?} AS {:?})", expr, data_type)
}
Expr::Not(expr) => write!(f, "NOT {:?}", expr),
Expr::Negative(expr) => write!(f, "(- {:?})", expr),
Expr::IsNull(expr) => write!(f, "{:?} IS NULL", expr),
Expr::IsNotNull(expr) => write!(f, "{:?} IS NOT NULL", expr),
Expr::BinaryExpr { left, op, right } => {
write!(f, "{:?} {} {:?}", left, op, right)
}
Expr::Sort {
expr,
asc,
nulls_first,
} => {
if *asc {
write!(f, "{:?} ASC", expr)?;
} else {
write!(f, "{:?} DESC", expr)?;
}
if *nulls_first {
write!(f, " NULLS FIRST")
} else {
write!(f, " NULLS LAST")
}
}
Expr::ScalarFunction { fun, args, .. } => {
fmt_function(f, &fun.to_string(), false, args, false)
}
Expr::ScalarUDF { fun, ref args, .. } => {
fmt_function(f, &fun.name, false, args, false)
}
Expr::WindowFunction {
fun,
args,
partition_by,
order_by,
window_frame,
} => {
fmt_function(f, &fun.to_string(), false, args, false)?;
if !partition_by.is_empty() {
write!(f, " PARTITION BY {:?}", partition_by)?;
}
if !order_by.is_empty() {
write!(f, " ORDER BY {:?}", order_by)?;
}
if let Some(window_frame) = window_frame {
write!(
f,
" {} BETWEEN {} AND {}",
window_frame.units,
window_frame.start_bound,
window_frame.end_bound
)?;
}
Ok(())
}
Expr::AggregateFunction {
fun,
distinct,
ref args,
..
} => fmt_function(f, &fun.to_string(), *distinct, args, true),
Expr::AggregateUDF { fun, ref args, .. } => {
fmt_function(f, &fun.name, false, args, false)
}
Expr::Between {
expr,
negated,
low,
high,
} => {
if *negated {
write!(f, "{:?} NOT BETWEEN {:?} AND {:?}", expr, low, high)
} else {
write!(f, "{:?} BETWEEN {:?} AND {:?}", expr, low, high)
}
}
Expr::InList {
expr,
list,
negated,
} => {
if *negated {
write!(f, "{:?} NOT IN ({:?})", expr, list)
} else {
write!(f, "{:?} IN ({:?})", expr, list)
}
}
Expr::Wildcard => write!(f, "*"),
Expr::GetIndexedField { ref expr, key } => {
write!(f, "({:?})[{}]", expr, key)
}
}
}
}

/// Returns a readable name of an expression based on the input schema.
/// This function recursively transverses the expression for names such as "CAST(a > 2)".
fn create_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
match e {
Expr::Alias(_, name) => Ok(name.clone()),
Expr::Column(c) => Ok(c.flat_name()),
Expr::ScalarVariable(variable_names) => Ok(variable_names.join(".")),
Expr::Literal(value) => Ok(format!("{:?}", value)),
Expr::BinaryExpr { left, op, right } => {
let left = create_name(left, input_schema)?;
let right = create_name(right, input_schema)?;
Ok(format!("{} {} {}", left, op, right))
}
Expr::Case {
expr,
when_then_expr,
else_expr,
} => {
let mut name = "CASE ".to_string();
if let Some(e) = expr {
let e = create_name(e, input_schema)?;
name += &format!("{} ", e);
}
for (w, t) in when_then_expr {
let when = create_name(w, input_schema)?;
let then = create_name(t, input_schema)?;
name += &format!("WHEN {} THEN {} ", when, then);
}
if let Some(e) = else_expr {
let e = create_name(e, input_schema)?;
name += &format!("ELSE {} ", e);
}
name += "END";
Ok(name)
}
Expr::Cast { expr, data_type } => {
let expr = create_name(expr, input_schema)?;
Ok(format!("CAST({} AS {:?})", expr, data_type))
}
Expr::TryCast { expr, data_type } => {
let expr = create_name(expr, input_schema)?;
Ok(format!("TRY_CAST({} AS {:?})", expr, data_type))
}
Expr::Not(expr) => {
let expr = create_name(expr, input_schema)?;
Ok(format!("NOT {}", expr))
}
Expr::Negative(expr) => {
let expr = create_name(expr, input_schema)?;
Ok(format!("(- {})", expr))
}
Expr::IsNull(expr) => {
let expr = create_name(expr, input_schema)?;
Ok(format!("{} IS NULL", expr))
}
Expr::IsNotNull(expr) => {
let expr = create_name(expr, input_schema)?;
Ok(format!("{} IS NOT NULL", expr))
}
Expr::GetIndexedField { expr, key } => {
let expr = create_name(expr, input_schema)?;
Ok(format!("{}[{}]", expr, key))
}
Expr::ScalarFunction { fun, args, .. } => {
create_function_name(&fun.to_string(), false, args, input_schema)
}
Expr::ScalarUDF { fun, args, .. } => {
create_function_name(&fun.name, false, args, input_schema)
}
Expr::WindowFunction {
fun,
args,
window_frame,
partition_by,
order_by,
} => {
let mut parts: Vec<String> = vec![create_function_name(
&fun.to_string(),
false,
args,
input_schema,
)?];
if !partition_by.is_empty() {
parts.push(format!("PARTITION BY {:?}", partition_by));
}
if !order_by.is_empty() {
parts.push(format!("ORDER BY {:?}", order_by));
}
if let Some(window_frame) = window_frame {
parts.push(format!("{}", window_frame));
}
Ok(parts.join(" "))
}
Expr::AggregateFunction {
fun,
distinct,
args,
..
} => create_function_name(&fun.to_string(), *distinct, args, input_schema),
Expr::AggregateUDF { fun, args } => {
let mut names = Vec::with_capacity(args.len());
for e in args {
names.push(create_name(e, input_schema)?);
}
Ok(format!("{}({})", fun.name, names.join(",")))
}
Expr::InList {
expr,
list,
negated,
} => {
let expr = create_name(expr, input_schema)?;
let list = list.iter().map(|expr| create_name(expr, input_schema));
if *negated {
Ok(format!("{} NOT IN ({:?})", expr, list))
} else {
Ok(format!("{} IN ({:?})", expr, list))
}
}
Expr::Between {
expr,
negated,
low,
high,
} => {
let expr = create_name(expr, input_schema)?;
let low = create_name(low, input_schema)?;
let high = create_name(high, input_schema)?;
if *negated {
Ok(format!("{} NOT BETWEEN {} AND {}", expr, low, high))
} else {
Ok(format!("{} BETWEEN {} AND {}", expr, low, high))
}
}
Expr::Sort { .. } => Err(DataFusionError::Internal(
"Create name does not support sort expression".to_string(),
)),
Expr::Wildcard => Err(DataFusionError::Internal(
"Create name does not support wildcard".to_string(),
)),
}
}

fn create_function_name(
fun: &str,
distinct: bool,
args: &[Expr],
input_schema: &DFSchema,
) -> Result<String> {
let names: Vec<String> = args
.iter()
.map(|e| create_name(e, input_schema))
.collect::<Result<_>>()?;
let distinct_str = match distinct {
true => "DISTINCT ",
false => "",
};
Ok(format!("{}({}{})", fun, distinct_str, names.join(",")))
}
Loading

0 comments on commit 06a5ee4

Please sign in to comment.