Skip to content

Commit

Permalink
Simplify physical expression creation API (not require schema) (apach…
Browse files Browse the repository at this point in the history
…e#8823)

* simplify physical expression creation API
  • Loading branch information
comphead authored Jan 12, 2024
1 parent 8353a2c commit eb81ea2
Show file tree
Hide file tree
Showing 14 changed files with 63 additions and 275 deletions.
2 changes: 1 addition & 1 deletion datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,5 +254,5 @@ pub fn physical_expr(schema: &Schema, expr: Expr) -> Result<Arc<dyn PhysicalExpr
// apply type coercion here to ensure types match
let expr = simplifier.coerce(expr, df_schema.clone())?;

create_physical_expr(&expr, df_schema.as_ref(), schema, &props)
create_physical_expr(&expr, df_schema.as_ref(), &props)
}
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ async fn prune_partitions(

// Applies `filter` to `batch` returning `None` on error
let do_filter = |filter| -> Option<ArrayRef> {
let expr = create_physical_expr(filter, &df_schema, &schema, &props).ok()?;
let expr = create_physical_expr(filter, &df_schema, &props).ok()?;
expr.evaluate(&batch)
.ok()?
.into_array(partitions.len())
Expand Down
8 changes: 2 additions & 6 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,12 +662,8 @@ impl TableProvider for ListingTable {
let filters = if let Some(expr) = conjunction(filters.to_vec()) {
// NOTE: Use the table schema (NOT file schema) here because `expr` may contain references to partition columns.
let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?;
let filters = create_physical_expr(
&expr,
&table_df_schema,
&self.table_schema,
state.execution_props(),
)?;
let filters =
create_physical_expr(&expr, &table_df_schema, state.execution_props())?;
Some(filters)
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2134,6 +2134,6 @@ mod tests {
fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> {
let df_schema = schema.clone().to_dfschema().unwrap();
let execution_props = ExecutionProps::new();
create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap()
create_physical_expr(expr, &df_schema, &execution_props).unwrap()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,6 @@ mod test {
fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> {
let df_schema = schema.clone().to_dfschema().unwrap();
let execution_props = ExecutionProps::new();
create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap()
create_physical_expr(expr, &df_schema, &execution_props).unwrap()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,7 @@ mod tests {
fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> {
let df_schema = schema.clone().to_dfschema().unwrap();
let execution_props = ExecutionProps::new();
create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap()
create_physical_expr(expr, &df_schema, &execution_props).unwrap()
}

#[tokio::test]
Expand Down
2 changes: 0 additions & 2 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2124,7 +2124,6 @@ mod tests {
use crate::test;
use crate::test_util::{plan_and_collect, populate_csv_partitions};
use crate::variable::VarType;
use arrow_schema::Schema;
use async_trait::async_trait;
use datafusion_expr::Expr;
use std::env;
Expand Down Expand Up @@ -2504,7 +2503,6 @@ mod tests {
&self,
_expr: &Expr,
_input_dfschema: &crate::common::DFSchema,
_input_schema: &Schema,
_session_state: &SessionState,
) -> Result<Arc<dyn crate::physical_plan::PhysicalExpr>> {
unimplemented!()
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3214,6 +3214,6 @@ mod tests {
fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> {
let df_schema = schema.clone().to_dfschema().unwrap();
let execution_props = ExecutionProps::new();
create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap()
create_physical_expr(expr, &df_schema, &execution_props).unwrap()
}
}
114 changes: 14 additions & 100 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,13 +401,10 @@ pub trait PhysicalPlanner: Send + Sync {
/// `expr`: the expression to convert
///
/// `input_dfschema`: the logical plan schema for evaluating `expr`
///
/// `input_schema`: the physical schema for evaluating `expr`
fn create_physical_expr(
&self,
expr: &Expr,
input_dfschema: &DFSchema,
input_schema: &Schema,
session_state: &SessionState,
) -> Result<Arc<dyn PhysicalExpr>>;
}
Expand Down Expand Up @@ -467,21 +464,13 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
/// `e`: the expression to convert
///
/// `input_dfschema`: the logical plan schema for evaluating `e`
///
/// `input_schema`: the physical schema for evaluating `e`
fn create_physical_expr(
&self,
expr: &Expr,
input_dfschema: &DFSchema,
input_schema: &Schema,
session_state: &SessionState,
) -> Result<Arc<dyn PhysicalExpr>> {
create_physical_expr(
expr,
input_dfschema,
input_schema,
session_state.execution_props(),
)
create_physical_expr(expr, input_dfschema, session_state.execution_props())
}
}

Expand Down Expand Up @@ -654,7 +643,6 @@ impl DefaultPhysicalPlanner {
self.create_physical_expr(
expr,
schema,
&exec_schema,
session_state,
)
})
Expand Down Expand Up @@ -694,7 +682,6 @@ impl DefaultPhysicalPlanner {
self.create_physical_expr(
e,
input.schema(),
&input_exec.schema(),
session_state,
)
})
Expand Down Expand Up @@ -884,7 +871,6 @@ impl DefaultPhysicalPlanner {
self.create_physical_expr(
e,
input_schema,
&input_exec.schema(),
session_state,
),
physical_name,
Expand All @@ -899,13 +885,11 @@ impl DefaultPhysicalPlanner {
}
LogicalPlan::Filter(filter) => {
let physical_input = self.create_initial_plan(&filter.input, session_state).await?;
let input_schema = physical_input.as_ref().schema();
let input_dfschema = filter.input.schema();

let runtime_expr = self.create_physical_expr(
&filter.predicate,
input_dfschema,
&input_schema,
session_state,
)?;
let selectivity = session_state.config().options().optimizer.default_filter_selectivity;
Expand All @@ -922,7 +906,6 @@ impl DefaultPhysicalPlanner {
partitioning_scheme,
}) => {
let physical_input = self.create_initial_plan(input, session_state).await?;
let input_schema = physical_input.schema();
let input_dfschema = input.as_ref().schema();
let physical_partitioning = match partitioning_scheme {
LogicalPartitioning::RoundRobinBatch(n) => {
Expand All @@ -935,7 +918,6 @@ impl DefaultPhysicalPlanner {
self.create_physical_expr(
e,
input_dfschema,
&input_schema,
session_state,
)
})
Expand All @@ -953,14 +935,12 @@ impl DefaultPhysicalPlanner {
}
LogicalPlan::Sort(Sort { expr, input, fetch, .. }) => {
let physical_input = self.create_initial_plan(input, session_state).await?;
let input_schema = physical_input.as_ref().schema();
let input_dfschema = input.as_ref().schema();
let sort_expr = expr
.iter()
.map(|e| create_physical_sort_expr(
e,
input_dfschema,
&input_schema,
session_state.execution_props(),
))
.collect::<Result<Vec<_>>>()?;
Expand Down Expand Up @@ -1107,7 +1087,6 @@ impl DefaultPhysicalPlanner {
let filter_expr = create_physical_expr(
expr,
&filter_df_schema,
&filter_schema,
session_state.execution_props(),
)?;
let column_indices = join_utils::JoinFilter::build_column_indices(left_field_indices, right_field_indices);
Expand Down Expand Up @@ -1348,12 +1327,7 @@ impl DefaultPhysicalPlanner {
)
}
expr => Ok(PhysicalGroupBy::new_single(vec![tuple_err((
self.create_physical_expr(
expr,
input_dfschema,
input_schema,
session_state,
),
self.create_physical_expr(expr, input_dfschema, session_state),
physical_name(expr),
))?])),
}
Expand All @@ -1363,12 +1337,7 @@ impl DefaultPhysicalPlanner {
.iter()
.map(|e| {
tuple_err((
self.create_physical_expr(
e,
input_dfschema,
input_schema,
session_state,
),
self.create_physical_expr(e, input_dfschema, session_state),
physical_name(e),
))
})
Expand Down Expand Up @@ -1406,7 +1375,6 @@ fn merge_grouping_set_physical_expr(
grouping_set_expr.push(get_physical_expr_pair(
expr,
input_dfschema,
input_schema,
session_state,
)?);

Expand Down Expand Up @@ -1461,12 +1429,7 @@ fn create_cube_physical_expr(
session_state,
)?);

all_exprs.push(get_physical_expr_pair(
expr,
input_dfschema,
input_schema,
session_state,
)?)
all_exprs.push(get_physical_expr_pair(expr, input_dfschema, session_state)?)
}

let mut groups: Vec<Vec<bool>> = Vec::with_capacity(num_groups);
Expand Down Expand Up @@ -1509,12 +1472,7 @@ fn create_rollup_physical_expr(
session_state,
)?);

all_exprs.push(get_physical_expr_pair(
expr,
input_dfschema,
input_schema,
session_state,
)?)
all_exprs.push(get_physical_expr_pair(expr, input_dfschema, session_state)?)
}

for total in 0..=num_of_exprs {
Expand All @@ -1541,12 +1499,8 @@ fn get_null_physical_expr_pair(
input_schema: &Schema,
session_state: &SessionState,
) -> Result<(Arc<dyn PhysicalExpr>, String)> {
let physical_expr = create_physical_expr(
expr,
input_dfschema,
input_schema,
session_state.execution_props(),
)?;
let physical_expr =
create_physical_expr(expr, input_dfschema, session_state.execution_props())?;
let physical_name = physical_name(&expr.clone())?;

let data_type = physical_expr.data_type(input_schema)?;
Expand All @@ -1559,15 +1513,10 @@ fn get_null_physical_expr_pair(
fn get_physical_expr_pair(
expr: &Expr,
input_dfschema: &DFSchema,
input_schema: &Schema,
session_state: &SessionState,
) -> Result<(Arc<dyn PhysicalExpr>, String)> {
let physical_expr = create_physical_expr(
expr,
input_dfschema,
input_schema,
session_state.execution_props(),
)?;
let physical_expr =
create_physical_expr(expr, input_dfschema, session_state.execution_props())?;
let physical_name = physical_name(expr)?;
Ok((physical_expr, physical_name))
}
Expand Down Expand Up @@ -1611,35 +1560,16 @@ pub fn create_window_expr_with_name(
}) => {
let args = args
.iter()
.map(|e| {
create_physical_expr(
e,
logical_input_schema,
physical_input_schema,
execution_props,
)
})
.map(|e| create_physical_expr(e, logical_input_schema, execution_props))
.collect::<Result<Vec<_>>>()?;
let partition_by = partition_by
.iter()
.map(|e| {
create_physical_expr(
e,
logical_input_schema,
physical_input_schema,
execution_props,
)
})
.map(|e| create_physical_expr(e, logical_input_schema, execution_props))
.collect::<Result<Vec<_>>>()?;
let order_by = order_by
.iter()
.map(|e| {
create_physical_sort_expr(
e,
logical_input_schema,
physical_input_schema,
execution_props,
)
create_physical_sort_expr(e, logical_input_schema, execution_props)
})
.collect::<Result<Vec<_>>>()?;
if !is_window_valid(window_frame) {
Expand Down Expand Up @@ -1711,20 +1641,12 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
}) => {
let args = args
.iter()
.map(|e| {
create_physical_expr(
e,
logical_input_schema,
physical_input_schema,
execution_props,
)
})
.map(|e| create_physical_expr(e, logical_input_schema, execution_props))
.collect::<Result<Vec<_>>>()?;
let filter = match filter {
Some(e) => Some(create_physical_expr(
e,
logical_input_schema,
physical_input_schema,
execution_props,
)?),
None => None,
Expand All @@ -1736,7 +1658,6 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
create_physical_sort_expr(
expr,
logical_input_schema,
physical_input_schema,
execution_props,
)
})
Expand Down Expand Up @@ -1804,7 +1725,6 @@ pub fn create_aggregate_expr_and_maybe_filter(
pub fn create_physical_sort_expr(
e: &Expr,
input_dfschema: &DFSchema,
input_schema: &Schema,
execution_props: &ExecutionProps,
) -> Result<PhysicalSortExpr> {
if let Expr::Sort(expr::Sort {
Expand All @@ -1814,12 +1734,7 @@ pub fn create_physical_sort_expr(
}) = e
{
Ok(PhysicalSortExpr {
expr: create_physical_expr(
expr,
input_dfschema,
input_schema,
execution_props,
)?,
expr: create_physical_expr(expr, input_dfschema, execution_props)?,
options: SortOptions {
descending: !asc,
nulls_first: *nulls_first,
Expand Down Expand Up @@ -2180,7 +2095,6 @@ mod tests {
let expr = planner.create_physical_expr(
&col("a").not(),
&dfschema,
&schema,
&make_session_state(),
)?;
let expected = expressions::not(expressions::col("a", &schema)?)?;
Expand Down
Loading

0 comments on commit eb81ea2

Please sign in to comment.