From 40652632f075230318285258e5b50fde00fc7494 Mon Sep 17 00:00:00 2001 From: Albert Skalt Date: Thu, 22 Aug 2024 18:19:34 +0300 Subject: [PATCH] Add ability to return `LogicalPlan` by value from `TableProvider` This patch adds the ability to return a `LogicalPlan` by value from a `TableProvider`. The motivation for this change is as follows: let's assume that we have `TableProvider` that supports concurrent modifications of the inner logical plan (for example, mutable VIEW), and it cannot afford to provide a reference for inlining. Since cloning occurs during inlining anyway, a corresponding method was added to extend the API, along with its default implementation. --- datafusion/catalog/src/table.rs | 7 +++- datafusion/expr/src/table_source.rs | 5 +++ .../src/analyzer/inline_table_scan.rs | 36 +++++++++---------- 3 files changed, 28 insertions(+), 20 deletions(-) diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index 792315642a001..dd9c81e3c9d84 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -56,7 +56,12 @@ pub trait TableProvider: Sync + Send { None } - /// Get the [`LogicalPlan`] of this table, if available + /// Get the [`LogicalPlan`] clone of this table, if available. + fn get_logical_plan_cloned(&self) -> Option { + self.get_logical_plan().cloned() + } + + /// Get the [`LogicalPlan`] of this table, if available. fn get_logical_plan(&self) -> Option<&LogicalPlan> { None } diff --git a/datafusion/expr/src/table_source.rs b/datafusion/expr/src/table_source.rs index 2de3cc9233155..72f8e1ba08c1f 100644 --- a/datafusion/expr/src/table_source.rs +++ b/datafusion/expr/src/table_source.rs @@ -121,6 +121,11 @@ pub trait TableSource: Sync + Send { .collect() } + /// Get the Logical plan clone of this table provider, if available. + fn get_logical_plan_cloned(&self) -> Option { + self.get_logical_plan().cloned() + } + /// Get the Logical plan of this table provider, if available. fn get_logical_plan(&self) -> Option<&LogicalPlan> { None diff --git a/datafusion/optimizer/src/analyzer/inline_table_scan.rs b/datafusion/optimizer/src/analyzer/inline_table_scan.rs index b69b8410da494..0d439802f19a4 100644 --- a/datafusion/optimizer/src/analyzer/inline_table_scan.rs +++ b/datafusion/optimizer/src/analyzer/inline_table_scan.rs @@ -24,7 +24,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{Column, Result}; use datafusion_expr::expr::WildcardOptions; -use datafusion_expr::{logical_plan::LogicalPlan, Expr, LogicalPlanBuilder, TableScan}; +use datafusion_expr::{logical_plan::LogicalPlan, Expr, LogicalPlanBuilder}; /// Analyzed rule that inlines TableScan that provide a [`LogicalPlan`] /// (DataFrame / ViewTable) @@ -56,24 +56,22 @@ fn analyze_internal(plan: LogicalPlan) -> Result> { match plan { // Match only on scans without filter / projection / fetch // Views and DataFrames won't have those added - // during the early stage of planning - LogicalPlan::TableScan(TableScan { - table_name, - source, - projection, - filters, - .. - }) if filters.is_empty() && source.get_logical_plan().is_some() => { - let sub_plan = source.get_logical_plan().unwrap(); - let projection_exprs = generate_projection_expr(&projection, sub_plan)?; - LogicalPlanBuilder::from(sub_plan.clone()) - .project(projection_exprs)? - // Ensures that the reference to the inlined table remains the - // same, meaning we don't have to change any of the parent nodes - // that reference this table. - .alias(table_name)? - .build() - .map(Transformed::yes) + // during the early stage of planning. + LogicalPlan::TableScan(table_scan) if table_scan.filters.is_empty() => { + if let Some(sub_plan) = table_scan.source.get_logical_plan_cloned() { + let projection_exprs = + generate_projection_expr(&table_scan.projection, &sub_plan)?; + LogicalPlanBuilder::from(sub_plan) + .project(projection_exprs)? + // Ensures that the reference to the inlined table remains the + // same, meaning we don't have to change any of the parent nodes + // that reference this table. + .alias(table_scan.table_name)? + .build() + .map(Transformed::yes) + } else { + Ok(Transformed::no(LogicalPlan::TableScan(table_scan))) + } } _ => Ok(Transformed::no(plan)), }