From da3f6afec51e4272622cb74ec62a11d13df17267 Mon Sep 17 00:00:00 2001
From: Albert Skalt <133099191+askalt@users.noreply.github.com>
Date: Mon, 26 Aug 2024 22:10:16 +0300
Subject: [PATCH] Add ability to return LogicalPlan by value from TableProvider
 (#12113)

This patch changes the `get_logical_plan(...)` method signature.
Now it returns a `Cow<LogicalPlan>` to allow an implementation to return
plan by value.
---
 datafusion/catalog/src/table.rs               |  5 ++-
 datafusion/core/src/dataframe/mod.rs          |  5 ++-
 .../core/src/datasource/cte_worktable.rs      |  4 +-
 .../src/datasource/default_table_source.rs    |  4 +-
 datafusion/core/src/datasource/view.rs        |  6 +--
 datafusion/expr/src/table_source.rs           |  4 +-
 .../src/analyzer/inline_table_scan.rs         | 43 +++++++++----------
 7 files changed, 36 insertions(+), 35 deletions(-)

diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs
index 792315642a00..69fa81faf8e2 100644
--- a/datafusion/catalog/src/table.rs
+++ b/datafusion/catalog/src/table.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use std::any::Any;
+use std::borrow::Cow;
 use std::sync::Arc;
 
 use crate::session::Session;
@@ -56,8 +57,8 @@ pub trait TableProvider: Sync + Send {
         None
     }
 
-    /// Get the [`LogicalPlan`] of this table, if available
-    fn get_logical_plan(&self) -> Option<&LogicalPlan> {
+    /// Get the [`LogicalPlan`] of this table, if available.
+    fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
         None
     }
 
diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs
index a38e7f45a6f1..c516c7985d54 100644
--- a/datafusion/core/src/dataframe/mod.rs
+++ b/datafusion/core/src/dataframe/mod.rs
@@ -21,6 +21,7 @@
 mod parquet;
 
 use std::any::Any;
+use std::borrow::Cow;
 use std::collections::HashMap;
 use std::sync::Arc;
 
@@ -1648,8 +1649,8 @@ impl TableProvider for DataFrameTableProvider {
         self
     }
 
-    fn get_logical_plan(&self) -> Option<&LogicalPlan> {
-        Some(&self.plan)
+    fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
+        Some(Cow::Borrowed(&self.plan))
     }
 
     fn supports_filters_pushdown(
diff --git a/datafusion/core/src/datasource/cte_worktable.rs b/datafusion/core/src/datasource/cte_worktable.rs
index d7d224828dda..d2da15c64f52 100644
--- a/datafusion/core/src/datasource/cte_worktable.rs
+++ b/datafusion/core/src/datasource/cte_worktable.rs
@@ -17,8 +17,8 @@
 
 //! CteWorkTable implementation used for recursive queries
 
-use std::any::Any;
 use std::sync::Arc;
+use std::{any::Any, borrow::Cow};
 
 use arrow::datatypes::SchemaRef;
 use async_trait::async_trait;
@@ -63,7 +63,7 @@ impl TableProvider for CteWorkTable {
         self
     }
 
-    fn get_logical_plan(&self) -> Option<&LogicalPlan> {
+    fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
         None
     }
 
diff --git a/datafusion/core/src/datasource/default_table_source.rs b/datafusion/core/src/datasource/default_table_source.rs
index 977e681d6641..b4a5a76fc9ff 100644
--- a/datafusion/core/src/datasource/default_table_source.rs
+++ b/datafusion/core/src/datasource/default_table_source.rs
@@ -17,8 +17,8 @@
 
 //! Default TableSource implementation used in DataFusion physical plans
 
-use std::any::Any;
 use std::sync::Arc;
+use std::{any::Any, borrow::Cow};
 
 use crate::datasource::TableProvider;
 
@@ -70,7 +70,7 @@ impl TableSource for DefaultTableSource {
         self.table_provider.supports_filters_pushdown(filter)
     }
 
-    fn get_logical_plan(&self) -> Option<&datafusion_expr::LogicalPlan> {
+    fn get_logical_plan(&self) -> Option<Cow<datafusion_expr::LogicalPlan>> {
         self.table_provider.get_logical_plan()
     }
 
diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs
index a81942bf769e..947714c1e4f9 100644
--- a/datafusion/core/src/datasource/view.rs
+++ b/datafusion/core/src/datasource/view.rs
@@ -17,7 +17,7 @@
 
 //! View data source which uses a LogicalPlan as it's input.
 
-use std::{any::Any, sync::Arc};
+use std::{any::Any, borrow::Cow, sync::Arc};
 
 use crate::{
     error::Result,
@@ -90,8 +90,8 @@ impl TableProvider for ViewTable {
         self
     }
 
-    fn get_logical_plan(&self) -> Option<&LogicalPlan> {
-        Some(&self.logical_plan)
+    fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
+        Some(Cow::Borrowed(&self.logical_plan))
     }
 
     fn schema(&self) -> SchemaRef {
diff --git a/datafusion/expr/src/table_source.rs b/datafusion/expr/src/table_source.rs
index 2de3cc923315..8b8d2dfcf2df 100644
--- a/datafusion/expr/src/table_source.rs
+++ b/datafusion/expr/src/table_source.rs
@@ -22,7 +22,7 @@ use crate::{Expr, LogicalPlan};
 use arrow::datatypes::SchemaRef;
 use datafusion_common::{Constraints, Result};
 
-use std::any::Any;
+use std::{any::Any, borrow::Cow};
 
 /// Indicates how a filter expression is handled by
 /// [`TableProvider::scan`].
@@ -122,7 +122,7 @@ pub trait TableSource: Sync + Send {
     }
 
     /// Get the Logical plan of this table provider, if available.
-    fn get_logical_plan(&self) -> Option<&LogicalPlan> {
+    fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
         None
     }
 
diff --git a/datafusion/optimizer/src/analyzer/inline_table_scan.rs b/datafusion/optimizer/src/analyzer/inline_table_scan.rs
index b69b8410da49..d5b3648725b9 100644
--- a/datafusion/optimizer/src/analyzer/inline_table_scan.rs
+++ b/datafusion/optimizer/src/analyzer/inline_table_scan.rs
@@ -24,7 +24,7 @@ use datafusion_common::config::ConfigOptions;
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
 use datafusion_common::{Column, Result};
 use datafusion_expr::expr::WildcardOptions;
-use datafusion_expr::{logical_plan::LogicalPlan, Expr, LogicalPlanBuilder, TableScan};
+use datafusion_expr::{logical_plan::LogicalPlan, Expr, LogicalPlanBuilder};
 
 /// Analyzed rule that inlines TableScan that provide a [`LogicalPlan`]
 /// (DataFrame / ViewTable)
@@ -56,24 +56,23 @@ fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
         match plan {
             // Match only on scans without filter / projection / fetch
             // Views and DataFrames won't have those added
-            // during the early stage of planning
-            LogicalPlan::TableScan(TableScan {
-                table_name,
-                source,
-                projection,
-                filters,
-                ..
-            }) if filters.is_empty() && source.get_logical_plan().is_some() => {
-                let sub_plan = source.get_logical_plan().unwrap();
-                let projection_exprs = generate_projection_expr(&projection, sub_plan)?;
-                LogicalPlanBuilder::from(sub_plan.clone())
-                    .project(projection_exprs)?
-                    // Ensures that the reference to the inlined table remains the
-                    // same, meaning we don't have to change any of the parent nodes
-                    // that reference this table.
-                    .alias(table_name)?
-                    .build()
-                    .map(Transformed::yes)
+            // during the early stage of planning.
+            LogicalPlan::TableScan(table_scan) if table_scan.filters.is_empty() => {
+                if let Some(sub_plan) = table_scan.source.get_logical_plan() {
+                    let sub_plan = sub_plan.into_owned();
+                    let projection_exprs =
+                        generate_projection_expr(&table_scan.projection, &sub_plan)?;
+                    LogicalPlanBuilder::from(sub_plan)
+                        .project(projection_exprs)?
+                        // Ensures that the reference to the inlined table remains the
+                        // same, meaning we don't have to change any of the parent nodes
+                        // that reference this table.
+                        .alias(table_scan.table_name)?
+                        .build()
+                        .map(Transformed::yes)
+                } else {
+                    Ok(Transformed::no(LogicalPlan::TableScan(table_scan)))
+                }
             }
             _ => Ok(Transformed::no(plan)),
         }
@@ -104,7 +103,7 @@ fn generate_projection_expr(
 
 #[cfg(test)]
 mod tests {
-    use std::{sync::Arc, vec};
+    use std::{borrow::Cow, sync::Arc, vec};
 
     use crate::analyzer::inline_table_scan::InlineTableScan;
     use crate::test::assert_analyzed_plan_eq;
@@ -167,8 +166,8 @@ mod tests {
             Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]))
         }
 
-        fn get_logical_plan(&self) -> Option<&LogicalPlan> {
-            Some(&self.plan)
+        fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
+            Some(Cow::Borrowed(&self.plan))
         }
     }