From 86548e3390b6547523f50bd0f0946787b215f7ef Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 1 May 2024 13:08:56 -0400 Subject: [PATCH] Clean up API, make examples easier --- datafusion-examples/examples/expr_api.rs | 20 +++--- datafusion/common/src/dfschema.rs | 20 ++++++ datafusion/core/src/execution/context/mod.rs | 60 ++++++++++++----- datafusion/core/src/test_util/parquet.rs | 2 +- .../optimizer/src/analyzer/type_coercion.rs | 64 +++++++++---------- .../simplify_expressions/expr_simplifier.rs | 6 +- 6 files changed, 110 insertions(+), 62 deletions(-) diff --git a/datafusion-examples/examples/expr_api.rs b/datafusion-examples/examples/expr_api.rs index edf49190bd63a..558b79125a221 100644 --- a/datafusion-examples/examples/expr_api.rs +++ b/datafusion-examples/examples/expr_api.rs @@ -25,9 +25,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use datafusion::common::DFSchema; use datafusion::error::Result; use datafusion::optimizer::simplify_expressions::ExprSimplifier; -use datafusion::physical_expr::{ - analyze, create_physical_expr, AnalysisContext, ExprBoundaries, PhysicalExpr, -}; +use datafusion::physical_expr::{analyze, AnalysisContext, ExprBoundaries}; use datafusion::prelude::*; use datafusion_common::{ScalarValue, ToDFSchema}; use datafusion_expr::execution_props::ExecutionProps; @@ -92,8 +90,8 @@ fn evaluate_demo() -> Result<()> { let expr = col("a").lt(lit(5)).or(col("a").eq(lit(8))); // First, you make a "physical expression" from the logical `Expr` - let ctx = SessionContext::new(); - let physical_expr = physical_expr(&batch.schema(), expr)?; + let df_schema = DFSchema::try_from(batch.schema())?; + let physical_expr = SessionContext::new().create_physical_expr(&df_schema, expr)?; // Now, you can evaluate the expression against the RecordBatch let result = physical_expr.evaluate(&batch)?; @@ -214,7 +212,7 @@ fn range_analysis_demo() -> Result<()> { // `date < '2020-10-01' AND date > '2020-09-01'` // As always, we need to tell DataFusion the type of column "date" - let schema = Schema::new(vec![make_field("date", DataType::Date32)]); + let schema = Arc::new(Schema::new(vec![make_field("date", DataType::Date32)])); // You can provide DataFusion any known boundaries on the values of `date` // (for example, maybe you know you only have data up to `2020-09-15`), but @@ -223,9 +221,13 @@ fn range_analysis_demo() -> Result<()> { let boundaries = ExprBoundaries::try_new_unbounded(&schema)?; // Now, we invoke the analysis code to perform the range analysis - let physical_expr = physical_expr(&schema, expr)?; - let analysis_result = - analyze(&physical_expr, AnalysisContext::new(boundaries), &schema)?; + let df_schema = DFSchema::try_from(schema)?; + let physical_expr = SessionContext::new().create_physical_expr(&df_schema, expr)?; + let analysis_result = analyze( + &physical_expr, + AnalysisContext::new(boundaries), + df_schema.as_ref(), + )?; // The results of the analysis is an range, encoded as an `Interval`, for // each column in the schema, that must be true in order for the predicate diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index b2a3de72356c2..d097623b560c7 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -125,6 +125,20 @@ impl DFSchema { } } + /// Return a reference to the inner Arrow [`Schema`] + /// + /// Note this does not have the qualifier information + pub fn as_arrow(&self) -> &Schema { + self.inner.as_ref() + } + + /// Return a reference to the inner Arrow [`SchemaRef`] + /// + /// Note this does not have the qualifier information + pub fn inner(&self) -> &SchemaRef { + &self.inner + } + /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier pub fn new_with_metadata( qualified_fields: Vec<(Option, Arc)>, @@ -806,6 +820,12 @@ impl From<&DFSchema> for Schema { } } +impl AsRef for DFSchema { + fn as_ref(&self) -> &Schema { + self.as_arrow() + } +} + /// Create a `DFSchema` from an Arrow schema impl TryFrom for DFSchema { type Error = DataFusionError; diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 26359e8311d05..4df7366d2e629 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -70,13 +70,13 @@ use datafusion_common::{ config::{ConfigExtension, TableOptions}, exec_err, not_impl_err, plan_datafusion_err, plan_err, tree_node::{TreeNodeRecursion, TreeNodeVisitor}, - DFSchemaRef, SchemaReference, TableReference, + DFSchema, SchemaReference, TableReference, }; use datafusion_execution::registry::SerializerRegistry; use datafusion_expr::{ logical_plan::{DdlStatement, Statement}, var_provider::is_system_variables, - Expr, StringifiedPlan, UserDefinedLogicalNode, WindowUDF, + Expr, ExprSchemable, StringifiedPlan, UserDefinedLogicalNode, WindowUDF, }; use datafusion_sql::{ parser::{CopyToSource, CopyToStatement, DFParser}, @@ -96,7 +96,7 @@ pub use datafusion_execution::config::SessionConfig; pub use datafusion_execution::TaskContext; pub use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::expr_rewriter::FunctionRewrite; -use datafusion_expr::simplify::SimplifyContext; +use datafusion_expr::simplify::SimplifyInfo; use datafusion_optimizer::simplify_expressions::ExprSimplifier; use datafusion_physical_expr::create_physical_expr; @@ -520,10 +520,10 @@ impl SessionContext { /// examples. pub fn create_physical_expr( &self, - schema: impl Into, + df_schema: &DFSchema, expr: Expr, ) -> Result> { - self.state.read().create_physical_expr(schema, expr) + self.state.read().create_physical_expr(df_schema, expr) } // return an empty dataframe @@ -1966,7 +1966,9 @@ impl SessionState { } /// Creates a [`PhysicalExpr`] from an [`Expr`] after applying type - /// coercion, simplifications, and function rewrites. + /// coercion, and function rewrites. + /// + /// Note that no simplification (TODO link) is applied. /// /// TODO links to coercsion, simplificiation, and rewrites /// @@ -1976,21 +1978,20 @@ impl SessionState { /// ``` pub fn create_physical_expr( &self, - schema: impl Into, + // todo make this schema + df_schema: &DFSchema, expr: Expr, ) -> Result> { - let df_schema = schema.into(); - // Simplify - let props = ExecutionProps::new(); - let simplifier = ExprSimplifier::new( - SimplifyContext::new(&props).with_schema(df_schema.clone()), - ); + let simplifier = + ExprSimplifier::new(SessionSimpifyProvider::new(self, df_schema)); // apply type coercion here to ensure types match - let expr = simplifier.coerce(expr, df_schema.clone())?; + let expr = simplifier.coerce(expr, df_schema)?; + // TODO should we also simplify the expression? + // simplifier.simplify() - create_physical_expr(&expr, df_schema.as_ref(), &props) + create_physical_expr(&expr, df_schema, self.execution_props()) } /// Return the session ID @@ -2070,6 +2071,35 @@ impl SessionState { } } +struct SessionSimpifyProvider<'a> { + state: &'a SessionState, + df_schema: &'a DFSchema, +} + +impl<'a> SessionSimpifyProvider<'a> { + fn new(state: &'a SessionState, df_schema: &'a DFSchema) -> Self { + Self { state, df_schema } + } +} + +impl<'a> SimplifyInfo for SessionSimpifyProvider<'a> { + fn is_boolean_type(&self, expr: &Expr) -> Result { + Ok(expr.get_type(self.df_schema)? == DataType::Boolean) + } + + fn nullable(&self, expr: &Expr) -> Result { + expr.nullable(self.df_schema) + } + + fn execution_props(&self) -> &ExecutionProps { + self.state.execution_props() + } + + fn get_data_type(&self, expr: &Expr) -> Result { + expr.get_type(self.df_schema) + } +} + struct SessionContextProvider<'a> { state: &'a SessionState, tables: HashMap>, diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index 8113d799a184d..92e65fd6a3eab 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -168,7 +168,7 @@ impl TestParquetFile { let parquet_options = ctx.copied_table_options().parquet; if let Some(filter) = maybe_filter { let simplifier = ExprSimplifier::new(context); - let filter = simplifier.coerce(filter, df_schema.clone()).unwrap(); + let filter = simplifier.coerce(filter, &df_schema).unwrap(); let physical_filter_expr = create_physical_expr(&filter, &df_schema, &ExecutionProps::default())?; let parquet_exec = Arc::new(ParquetExec::new( diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index b7f95d83e8fc7..adb0371cbb616 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -25,7 +25,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TreeNodeRewriter}; use datafusion_common::{ exec_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, DFSchema, - DFSchemaRef, DataFusionError, Result, ScalarValue, + DataFusionError, Result, ScalarValue, }; use datafusion_expr::expr::{ self, AggregateFunctionDefinition, Between, BinaryExpr, Case, Exists, InList, @@ -99,9 +99,7 @@ fn analyze_internal( // select t2.c2 from t1 where t1.c1 in (select t2.c1 from t2 where t2.c2=t1.c3) schema.merge(external_schema); - let mut expr_rewrite = TypeCoercionRewriter { - schema: Arc::new(schema), - }; + let mut expr_rewrite = TypeCoercionRewriter { schema: &schema }; let new_expr = plan .expressions() @@ -116,11 +114,11 @@ fn analyze_internal( plan.with_new_exprs(new_expr, new_inputs) } -pub(crate) struct TypeCoercionRewriter { - pub(crate) schema: DFSchemaRef, +pub(crate) struct TypeCoercionRewriter<'a> { + pub(crate) schema: &'a DFSchema, } -impl TreeNodeRewriter for TypeCoercionRewriter { +impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> { type Node = Expr; fn f_up(&mut self, expr: Expr) -> Result> { @@ -154,7 +152,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter { negated, }) => { let new_plan = analyze_internal(&self.schema, &subquery.subquery)?; - let expr_type = expr.get_type(&self.schema)?; + let expr_type = expr.get_type(self.schema)?; let subquery_type = new_plan.schema().field(0).data_type(); let common_type = comparison_coercion(&expr_type, subquery_type).ok_or(plan_datafusion_err!( "expr type {expr_type:?} can't cast to {subquery_type:?} in InSubquery" @@ -165,14 +163,14 @@ impl TreeNodeRewriter for TypeCoercionRewriter { outer_ref_columns: subquery.outer_ref_columns, }; Ok(Transformed::yes(Expr::InSubquery(InSubquery::new( - Box::new(expr.cast_to(&common_type, &self.schema)?), + Box::new(expr.cast_to(&common_type, self.schema)?), cast_subquery(new_subquery, &common_type)?, negated, )))) } Expr::Not(expr) => Ok(Transformed::yes(not(get_casted_expr_for_bool_op( *expr, - &self.schema, + self.schema, )?))), Expr::IsTrue(expr) => Ok(Transformed::yes(is_true( get_casted_expr_for_bool_op(*expr, &self.schema)?, @@ -199,8 +197,8 @@ impl TreeNodeRewriter for TypeCoercionRewriter { escape_char, case_insensitive, }) => { - let left_type = expr.get_type(&self.schema)?; - let right_type = pattern.get_type(&self.schema)?; + let left_type = expr.get_type(self.schema)?; + let right_type = pattern.get_type(self.schema)?; let coerced_type = like_coercion(&left_type, &right_type).ok_or_else(|| { let op_name = if case_insensitive { "ILIKE" @@ -211,8 +209,8 @@ impl TreeNodeRewriter for TypeCoercionRewriter { "There isn't a common type to coerce {left_type} and {right_type} in {op_name} expression" ) })?; - let expr = Box::new(expr.cast_to(&coerced_type, &self.schema)?); - let pattern = Box::new(pattern.cast_to(&coerced_type, &self.schema)?); + let expr = Box::new(expr.cast_to(&coerced_type, self.schema)?); + let pattern = Box::new(pattern.cast_to(&coerced_type, self.schema)?); Ok(Transformed::yes(Expr::Like(Like::new( negated, expr, @@ -223,14 +221,14 @@ impl TreeNodeRewriter for TypeCoercionRewriter { } Expr::BinaryExpr(BinaryExpr { left, op, right }) => { let (left_type, right_type) = get_input_types( - &left.get_type(&self.schema)?, + &left.get_type(self.schema)?, &op, - &right.get_type(&self.schema)?, + &right.get_type(self.schema)?, )?; Ok(Transformed::yes(Expr::BinaryExpr(BinaryExpr::new( - Box::new(left.cast_to(&left_type, &self.schema)?), + Box::new(left.cast_to(&left_type, self.schema)?), op, - Box::new(right.cast_to(&right_type, &self.schema)?), + Box::new(right.cast_to(&right_type, self.schema)?), )))) } Expr::Between(Between { @@ -239,15 +237,15 @@ impl TreeNodeRewriter for TypeCoercionRewriter { low, high, }) => { - let expr_type = expr.get_type(&self.schema)?; - let low_type = low.get_type(&self.schema)?; + let expr_type = expr.get_type(self.schema)?; + let low_type = low.get_type(self.schema)?; let low_coerced_type = comparison_coercion(&expr_type, &low_type) .ok_or_else(|| { DataFusionError::Internal(format!( "Failed to coerce types {expr_type} and {low_type} in BETWEEN expression" )) })?; - let high_type = high.get_type(&self.schema)?; + let high_type = high.get_type(self.schema)?; let high_coerced_type = comparison_coercion(&expr_type, &low_type) .ok_or_else(|| { DataFusionError::Internal(format!( @@ -262,10 +260,10 @@ impl TreeNodeRewriter for TypeCoercionRewriter { )) })?; Ok(Transformed::yes(Expr::Between(Between::new( - Box::new(expr.cast_to(&coercion_type, &self.schema)?), + Box::new(expr.cast_to(&coercion_type, self.schema)?), negated, - Box::new(low.cast_to(&coercion_type, &self.schema)?), - Box::new(high.cast_to(&coercion_type, &self.schema)?), + Box::new(low.cast_to(&coercion_type, self.schema)?), + Box::new(high.cast_to(&coercion_type, self.schema)?), )))) } Expr::InList(InList { @@ -273,10 +271,10 @@ impl TreeNodeRewriter for TypeCoercionRewriter { list, negated, }) => { - let expr_data_type = expr.get_type(&self.schema)?; + let expr_data_type = expr.get_type(self.schema)?; let list_data_types = list .iter() - .map(|list_expr| list_expr.get_type(&self.schema)) + .map(|list_expr| list_expr.get_type(self.schema)) .collect::>>()?; let result_type = get_coerce_type_for_list(&expr_data_type, &list_data_types); @@ -286,11 +284,11 @@ impl TreeNodeRewriter for TypeCoercionRewriter { ), Some(coerced_type) => { // find the coerced type - let cast_expr = expr.cast_to(&coerced_type, &self.schema)?; + let cast_expr = expr.cast_to(&coerced_type, self.schema)?; let cast_list_expr = list .into_iter() .map(|list_expr| { - list_expr.cast_to(&coerced_type, &self.schema) + list_expr.cast_to(&coerced_type, self.schema) }) .collect::>>()?; Ok(Transformed::yes(Expr::InList(InList ::new( @@ -302,7 +300,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter { } } Expr::Case(case) => { - let case = coerce_case_expression(case, &self.schema)?; + let case = coerce_case_expression(case, self.schema)?; Ok(Transformed::yes(Expr::Case(case))) } Expr::ScalarFunction(ScalarFunction { func_def, args }) => match func_def { @@ -375,7 +373,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter { null_treatment, }) => { let window_frame = - coerce_window_frame(window_frame, &self.schema, &order_by)?; + coerce_window_frame(window_frame, self.schema, &order_by)?; let args = match &fun { expr::WindowFunctionDefinition::AggregateFunction(fun) => { @@ -495,7 +493,7 @@ fn coerce_frame_bound( // For example, ROWS and GROUPS frames use `UInt64` during calculations. fn coerce_window_frame( window_frame: WindowFrame, - schema: &DFSchemaRef, + schema: &DFSchema, expressions: &[Expr], ) -> Result { let mut window_frame = window_frame; @@ -531,7 +529,7 @@ fn coerce_window_frame( // Support the `IsTrue` `IsNotTrue` `IsFalse` `IsNotFalse` type coercion. // The above op will be rewrite to the binary op when creating the physical op. -fn get_casted_expr_for_bool_op(expr: Expr, schema: &DFSchemaRef) -> Result { +fn get_casted_expr_for_bool_op(expr: Expr, schema: &DFSchema) -> Result { let left_type = expr.get_type(schema)?; get_input_types(&left_type, &Operator::IsDistinctFrom, &DataType::Boolean)?; expr.cast_to(&DataType::Boolean, schema) @@ -615,7 +613,7 @@ fn coerce_agg_exprs_for_signature( .collect() } -fn coerce_case_expression(case: Case, schema: &DFSchemaRef) -> Result { +fn coerce_case_expression(case: Case, schema: &DFSchema) -> Result { // Given expressions like: // // CASE a1 diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index fb5125f097692..8bbbdcbaaeb8e 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -31,9 +31,7 @@ use datafusion_common::{ cast::{as_large_list_array, as_list_array}, tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRewriter}, }; -use datafusion_common::{ - internal_err, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, -}; +use datafusion_common::{internal_err, DFSchema, DataFusionError, Result, ScalarValue}; use datafusion_expr::expr::{InList, InSubquery}; use datafusion_expr::simplify::ExprSimplifyResult; use datafusion_expr::{ @@ -213,7 +211,7 @@ impl ExprSimplifier { // rather than creating an DFSchemaRef coerces rather than doing // it manually. // https://github.com/apache/datafusion/issues/3793 - pub fn coerce(&self, expr: Expr, schema: DFSchemaRef) -> Result { + pub fn coerce(&self, expr: Expr, schema: &DFSchema) -> Result { let mut expr_rewrite = TypeCoercionRewriter { schema }; expr.rewrite(&mut expr_rewrite).data()