From d26b54c4036117f3c5f2841008d1c0968641c5e1 Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Sun, 20 Jun 2021 14:20:30 -0700 Subject: [PATCH] fix window schema handling in projection pushdown optimizer --- .../src/optimizer/projection_push_down.rs | 32 +++++++------------ datafusion/src/physical_plan/planner.rs | 6 ++-- datafusion/src/physical_plan/windows.rs | 2 +- integration-tests/test_psql_parity.py | 2 +- 4 files changed, 15 insertions(+), 27 deletions(-) diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs index 17186972d7db..2544d89d0492 100644 --- a/datafusion/src/optimizer/projection_push_down.rs +++ b/datafusion/src/optimizer/projection_push_down.rs @@ -21,7 +21,8 @@ use crate::error::Result; use crate::execution::context::ExecutionProps; use crate::logical_plan::{ - build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan, ToDFSchema, + build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan, + LogicalPlanBuilder, ToDFSchema, }; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::utils; @@ -253,26 +254,15 @@ fn optimize_plan( &mut new_required_columns, )?; - let new_schema = DFSchema::new( - schema - .fields() - .iter() - .filter(|f| new_required_columns.contains(&f.qualified_column())) - .cloned() - .collect(), - )?; - - Ok(LogicalPlan::Window { - window_expr: new_window_expr, - input: Arc::new(optimize_plan( - optimizer, - input, - &new_required_columns, - true, - execution_props, - )?), - schema: DFSchemaRef::new(new_schema), - }) + LogicalPlanBuilder::from(&optimize_plan( + optimizer, + input, + &new_required_columns, + true, + execution_props, + )?) + .window(new_window_expr)? + .build() } LogicalPlan::Aggregate { schema, diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 40719c000a5b..902b122e9b4d 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -265,10 +265,8 @@ impl DefaultPhysicalPlanner { } let input_exec = self.create_initial_plan(input, ctx_state)?; - let input_schema = input_exec.schema(); - + let physical_input_schema = input_exec.schema(); let logical_input_schema = input.as_ref().schema(); - let physical_input_schema = input_exec.as_ref().schema(); let window_expr = window_expr .iter() @@ -285,7 +283,7 @@ impl DefaultPhysicalPlanner { Ok(Arc::new(WindowAggExec::try_new( window_expr, input_exec.clone(), - input_schema, + physical_input_schema, )?)) } LogicalPlan::Aggregate { diff --git a/datafusion/src/physical_plan/windows.rs b/datafusion/src/physical_plan/windows.rs index fdea92c922f5..cb1dda738bd4 100644 --- a/datafusion/src/physical_plan/windows.rs +++ b/datafusion/src/physical_plan/windows.rs @@ -320,7 +320,7 @@ impl WindowAggExec { input: Arc, input_schema: SchemaRef, ) -> Result { - let schema = create_schema(&input.schema(), &window_expr)?; + let schema = create_schema(&input_schema, &window_expr)?; let schema = Arc::new(schema); Ok(WindowAggExec { input, diff --git a/integration-tests/test_psql_parity.py b/integration-tests/test_psql_parity.py index 4e0878c24b81..10ff5055e6f7 100644 --- a/integration-tests/test_psql_parity.py +++ b/integration-tests/test_psql_parity.py @@ -83,7 +83,7 @@ def test_parity(self): psql_output = pd.read_csv(io.BytesIO(generate_csv_from_psql(fname))) self.assertTrue( np.allclose(datafusion_output, psql_output), - msg=f"data fusion output={datafusion_output}, psql_output={psql_output}", + msg=f"datafusion output=\n{datafusion_output}, psql_output=\n{psql_output}", )