From 56278756564df473ae329311170ff8426487dde1 Mon Sep 17 00:00:00 2001 From: DreaMer963 Date: Thu, 28 Oct 2021 01:09:41 +0800 Subject: [PATCH] Fix logical plan optimization will execute twice in SQL mode --- ballista/rust/core/src/serde/physical_plan/from_proto.rs | 1 + datafusion/src/execution/context.rs | 5 +++++ datafusion/src/execution/dataframe_impl.rs | 6 +++++- 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index dce354ac69fa..e2e0c7545118 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -666,6 +666,7 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc { config: ExecutionConfig::new(), execution_props: ExecutionProps::new(), object_store_registry: Arc::new(ObjectStoreRegistry::new()), + has_optimized: true, }; let fun_expr = functions::create_physical_fun( diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index bb23b3f4e91c..d68358686928 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -176,6 +176,7 @@ impl ExecutionContext { config, execution_props: ExecutionProps::new(), object_store_registry: Arc::new(ObjectStoreRegistry::new()), + has_optimized: false, })), } } @@ -767,6 +768,7 @@ impl ExecutionContext { observer(&new_plan, optimizer.as_ref()); } debug!("Optimized logical plan:\n {:?}", new_plan); + state.has_optimized = true; Ok(new_plan) } } @@ -1037,6 +1039,8 @@ pub struct ExecutionContextState { pub execution_props: ExecutionProps, /// Object Store that are registered with the context pub object_store_registry: Arc, + /// If Logical Plan has optimized, it will be true + pub has_optimized: bool, } impl ExecutionProps { @@ -1065,6 +1069,7 @@ impl ExecutionContextState { config: ExecutionConfig::new(), execution_props: ExecutionProps::new(), object_store_registry: Arc::new(ObjectStoreRegistry::new()), + has_optimized: false, } } diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs index a313cc170a40..c2253c7bc51f 100644 --- a/datafusion/src/execution/dataframe_impl.rs +++ b/datafusion/src/execution/dataframe_impl.rs @@ -56,8 +56,12 @@ impl DataFrameImpl { /// Create a physical plan async fn create_physical_plan(&self) -> Result> { let state = self.ctx_state.lock().unwrap().clone(); + let has_optimized = state.has_optimized; let ctx = ExecutionContext::from(Arc::new(Mutex::new(state))); - let plan = ctx.optimize(&self.plan)?; + let mut plan: LogicalPlan = self.plan.clone(); + if !has_optimized { + plan = ctx.optimize(&self.plan)?; + } ctx.create_physical_plan(&plan).await } }