Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Expr::GetIndexedField to use a struct #3838

Merged
merged 9 commits into from
Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
let expr = create_physical_name(expr, false)?;
Ok(format!("{} IS NOT UNKNOWN", expr))
}
Expr::GetIndexedField { expr, key } => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{}[{}]", expr, key))
Expr::GetIndexedField(get_indexed_field) => {
let expr = create_physical_name(&get_indexed_field.expr, false)?;
Ok(format!("{}[{}]", expr, get_indexed_field.key))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend this style so that if we ever change GetIndexedField the compiler will tell us all the places where a new field may be needed

Suggested change
Expr::GetIndexedField(get_indexed_field) => {
let expr = create_physical_name(&get_indexed_field.expr, false)?;
Ok(format!("{}[{}]", expr, get_indexed_field.key))
Expr::GetIndexedField(GetIndexedField { expr, key }) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{}[{}]", expr, key))

}
Expr::ScalarFunction { fun, args, .. } => {
create_function_physical_name(&fun.to_string(), false, args)
Expand Down
29 changes: 18 additions & 11 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,7 @@ pub enum Expr {
/// arithmetic negation of an expression, the operand must be of a signed numeric data type
Negative(Box<Expr>),
/// Returns the field of a [`arrow::array::ListArray`] or [`arrow::array::StructArray`] by key
GetIndexedField {
/// the expression to take the field from
expr: Box<Expr>,
/// The name of the field to take
key: ScalarValue,
},
GetIndexedField(GetIndexedField),
/// Whether an expression is between a given range.
Between {
/// The value to compare
Expand Down Expand Up @@ -285,6 +280,14 @@ pub enum Expr {
GroupingSet(GroupingSet),
}

#[derive(Clone, PartialEq, Eq, Hash)]
pub struct GetIndexedField {
/// the expression to take the field from
pub expr: Box<Expr>,
/// The name of the field to take
pub key: ScalarValue,
}

/// CASE expression
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct Case {
Expand Down Expand Up @@ -831,8 +834,12 @@ impl fmt::Debug for Expr {
}
Expr::Wildcard => write!(f, "*"),
Expr::QualifiedWildcard { qualifier } => write!(f, "{}.*", qualifier),
Expr::GetIndexedField { ref expr, key } => {
write!(f, "({:?})[{}]", expr, key)
Expr::GetIndexedField(get_indexed_field) => {
write!(
f,
"({:?})[{}]",
get_indexed_field.expr, get_indexed_field.key
)
}
Expr::GroupingSet(grouping_sets) => match grouping_sets {
GroupingSet::Rollup(exprs) => {
Expand Down Expand Up @@ -1059,9 +1066,9 @@ fn create_name(e: &Expr) -> Result<String> {
Expr::ScalarSubquery(subquery) => {
Ok(subquery.subquery.schema().field(0).name().clone())
}
Expr::GetIndexedField { expr, key } => {
let expr = create_name(expr)?;
Ok(format!("{}[{}]", expr, key))
Expr::GetIndexedField(get_indexed_field) => {
let expr = create_name(&get_indexed_field.expr)?;
Ok(format!("{}[{}]", expr, get_indexed_field.key))
}
Expr::ScalarFunction { fun, args, .. } => {
create_function_name(&fun.to_string(), false, args)
Expand Down
12 changes: 7 additions & 5 deletions datafusion/expr/src/expr_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Expression rewriter

use crate::expr::{Case, GroupingSet};
use crate::expr::{Case, GetIndexedField, GroupingSet};
use crate::logical_plan::{Aggregate, Projection};
use crate::utils::{from_plan, grouping_set_to_exprlist};
use crate::{Expr, ExprSchemable, LogicalPlan};
Expand Down Expand Up @@ -284,10 +284,12 @@ impl ExprRewritable for Expr {
Expr::QualifiedWildcard { qualifier } => {
Expr::QualifiedWildcard { qualifier }
}
Expr::GetIndexedField { expr, key } => Expr::GetIndexedField {
expr: rewrite_boxed(expr, rewriter)?,
key,
},
Expr::GetIndexedField(get_indexed_field) => {
Expr::GetIndexedField(GetIndexedField {
expr: rewrite_boxed(get_indexed_field.expr, rewriter)?,
key: get_indexed_field.key,
})
}
};

// now rewrite this expression itself
Expand Down
13 changes: 7 additions & 6 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,11 @@ impl ExprSchemable for Expr {
// grouping sets do not really have a type and do not appear in projections
Ok(DataType::Null)
}
Expr::GetIndexedField { ref expr, key } => {
let data_type = expr.get_type(schema)?;
Expr::GetIndexedField(indexed_field) => {
let data_type = indexed_field.expr.get_type(schema)?;

get_indexed_field(&data_type, key).map(|x| x.data_type().clone())
get_indexed_field(&data_type, &indexed_field.key)
.map(|x| x.data_type().clone())
}
}
}
Expand Down Expand Up @@ -217,9 +218,9 @@ impl ExprSchemable for Expr {
"QualifiedWildcard expressions are not valid in a logical query plan"
.to_owned(),
)),
Expr::GetIndexedField { ref expr, key } => {
let data_type = expr.get_type(input_schema)?;
get_indexed_field(&data_type, key).map(|x| x.is_nullable())
Expr::GetIndexedField(indexed_field) => {
let data_type = indexed_field.expr.get_type(input_schema)?;
get_indexed_field(&data_type, &indexed_field.key).map(|x| x.is_nullable())
}
Expr::GroupingSet(_) => {
// grouping sets do not really have the concept of nullable and do not appear
Expand Down
6 changes: 4 additions & 2 deletions datafusion/expr/src/expr_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,10 @@ impl ExprVisitable for Expr {
| Expr::Cast { expr, .. }
| Expr::TryCast { expr, .. }
| Expr::Sort { expr, .. }
| Expr::InSubquery { expr, .. }
| Expr::GetIndexedField { expr, .. } => expr.accept(visitor),
| Expr::InSubquery { expr, .. } => expr.accept(visitor),
Expr::GetIndexedField(get_indexed_field) => {
get_indexed_field.expr.accept(visitor)
}
Expr::GroupingSet(GroupingSet::Rollup(exprs)) => exprs
.iter()
.fold(Ok(visitor), |v, e| v.and_then(|v| e.accept(v))),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub use accumulator::{Accumulator, AggregateState};
pub use aggregate_function::AggregateFunction;
pub use built_in_function::BuiltinScalarFunction;
pub use columnar_value::{ColumnarValue, NullColumnarValue};
pub use expr::{Case, Expr, GroupingSet};
pub use expr::{Case, Expr, GetIndexedField, GroupingSet};
pub use expr_fn::*;
pub use expr_schema::ExprSchemable;
pub use function::{
Expand Down
15 changes: 11 additions & 4 deletions datafusion/physical-expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,17 @@ pub fn create_physical_expr(
input_schema,
execution_props,
)?),
Expr::GetIndexedField { expr, key } => Ok(Arc::new(GetIndexedFieldExpr::new(
create_physical_expr(expr, input_dfschema, input_schema, execution_props)?,
key.clone(),
))),
Expr::GetIndexedField(get_indexed_field) => {
Ok(Arc::new(GetIndexedFieldExpr::new(
create_physical_expr(
&get_indexed_field.expr,
input_dfschema,
input_schema,
execution_props,
)?,
get_indexed_field.key.clone(),
)))
}

Expr::ScalarFunction { fun, args } => {
let physical_args = args
Expand Down
6 changes: 3 additions & 3 deletions datafusion/proto/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use datafusion_expr::{
sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, tan,
to_hex, to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate,
trim, trunc, upper, AggregateFunction, BuiltInWindowFunction, BuiltinScalarFunction,
Case, Expr, GroupingSet,
Case, Expr, GetIndexedField, GroupingSet,
GroupingSet::GroupingSets,
Operator, WindowFrame, WindowFrameBound, WindowFrameUnits,
};
Expand Down Expand Up @@ -800,10 +800,10 @@ pub fn parse_expr(

let expr = parse_required_expr(&field.expr, registry, "expr")?;

Ok(Expr::GetIndexedField {
Ok(Expr::GetIndexedField(GetIndexedField {
expr: Box::new(expr),
key,
})
}))
}
ExprType::Column(column) => Ok(Expr::Column(column.into())),
ExprType::Literal(literal) => {
Expand Down
7 changes: 4 additions & 3 deletions datafusion/proto/src/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -831,11 +831,12 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
// see discussion in https://github.com/apache/arrow-datafusion/issues/2565
return Err(Error::General("Proto serialization error: Expr::ScalarSubquery(_) | Expr::InSubquery { .. } | Expr::Exists { .. } not supported".to_string()))
}
Expr::GetIndexedField { key, expr } => Self {
Expr::GetIndexedField(get_indexed_field) =>
Self {
expr_type: Some(ExprType::GetIndexedField(Box::new(
protobuf::GetIndexedField {
key: Some(key.try_into()?),
expr: Some(Box::new(expr.as_ref().try_into()?)),
key: Some((&get_indexed_field.key).try_into()?),
expr: Some(Box::new(get_indexed_field.expr.as_ref().try_into()?)),
},
))),
},
Expand Down
12 changes: 6 additions & 6 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use datafusion_expr::utils::{
COUNT_STAR_EXPANSION,
};
use datafusion_expr::{
and, col, lit, AggregateFunction, AggregateUDF, Expr, ExprSchemable, Operator,
ScalarUDF, WindowFrame, WindowFrameUnits,
and, col, lit, AggregateFunction, AggregateUDF, Expr, ExprSchemable, GetIndexedField,
Operator, ScalarUDF, WindowFrame, WindowFrameUnits,
};
use datafusion_expr::{
window_function::WindowFunction, BuiltinScalarFunction, TableSource,
Expand Down Expand Up @@ -123,10 +123,10 @@ fn plan_indexed(expr: Expr, mut keys: Vec<SQLExpr>) -> Result<Expr> {
expr
};

Ok(Expr::GetIndexedField {
Ok(Expr::GetIndexedField(GetIndexedField {
expr: Box::new(expr),
key: plan_key(key)?,
})
}))
}

impl<'a, S: ContextProvider> SqlToRel<'a, S> {
Expand Down Expand Up @@ -1834,10 +1834,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
Err(_) => {
if let Some(field) = schema.fields().iter().find(|f| f.name().eq(&relation)) {
// Access to a field of a column which is a structure, example: SELECT my_struct.key
Ok(Expr::GetIndexedField {
Ok(Expr::GetIndexedField(GetIndexedField {
expr: Box::new(Expr::Column(field.qualified_column())),
key: ScalarValue::Utf8(Some(name)),
})
}))
} else {
// table.column identifier
Ok(Expr::Column(Column {
Expand Down
15 changes: 10 additions & 5 deletions datafusion/sql/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use arrow::datatypes::{DataType, DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE
use sqlparser::ast::Ident;

use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_expr::expr::{Case, GroupingSet};
use datafusion_expr::expr::{Case, GetIndexedField, GroupingSet};
use datafusion_expr::utils::{expr_as_column_expr, find_column_exprs};
use datafusion_expr::{Expr, LogicalPlan};
use std::collections::HashMap;
Expand Down Expand Up @@ -375,10 +375,15 @@ where
}),
Expr::Wildcard => Ok(Expr::Wildcard),
Expr::QualifiedWildcard { .. } => Ok(expr.clone()),
Expr::GetIndexedField { expr, key } => Ok(Expr::GetIndexedField {
expr: Box::new(clone_with_replacement(expr.as_ref(), replacement_fn)?),
key: key.clone(),
}),
Expr::GetIndexedField(get_indexed_field) => {
Ok(Expr::GetIndexedField(GetIndexedField {
expr: Box::new(clone_with_replacement(
get_indexed_field.expr.as_ref(),
replacement_fn,
)?),
key: get_indexed_field.key.clone(),
}))
}
Expr::GroupingSet(set) => match set {
GroupingSet::Rollup(exprs) => Ok(Expr::GroupingSet(GroupingSet::Rollup(
exprs
Expand Down