Skip to content

Commit

Permalink
Add ability to return LogicalPlan by value from TableProvider (#12113)
Browse files Browse the repository at this point in the history
This patch changes the `get_logical_plan(...)` method signature.
Now it returns a `Cow<LogicalPlan>` to allow an implementation to return
plan by value.
  • Loading branch information
askalt authored Aug 26, 2024
1 parent 533ddcb commit da3f6af
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 35 deletions.
5 changes: 3 additions & 2 deletions datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use std::any::Any;
use std::borrow::Cow;
use std::sync::Arc;

use crate::session::Session;
Expand Down Expand Up @@ -56,8 +57,8 @@ pub trait TableProvider: Sync + Send {
None
}

/// Get the [`LogicalPlan`] of this table, if available
fn get_logical_plan(&self) -> Option<&LogicalPlan> {
/// Get the [`LogicalPlan`] of this table, if available.
fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
None
}

Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
mod parquet;

use std::any::Any;
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -1648,8 +1649,8 @@ impl TableProvider for DataFrameTableProvider {
self
}

fn get_logical_plan(&self) -> Option<&LogicalPlan> {
Some(&self.plan)
fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
Some(Cow::Borrowed(&self.plan))
}

fn supports_filters_pushdown(
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/cte_worktable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

//! CteWorkTable implementation used for recursive queries
use std::any::Any;
use std::sync::Arc;
use std::{any::Any, borrow::Cow};

use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
Expand Down Expand Up @@ -63,7 +63,7 @@ impl TableProvider for CteWorkTable {
self
}

fn get_logical_plan(&self) -> Option<&LogicalPlan> {
fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
None
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/default_table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

//! Default TableSource implementation used in DataFusion physical plans
use std::any::Any;
use std::sync::Arc;
use std::{any::Any, borrow::Cow};

use crate::datasource::TableProvider;

Expand Down Expand Up @@ -70,7 +70,7 @@ impl TableSource for DefaultTableSource {
self.table_provider.supports_filters_pushdown(filter)
}

fn get_logical_plan(&self) -> Option<&datafusion_expr::LogicalPlan> {
fn get_logical_plan(&self) -> Option<Cow<datafusion_expr::LogicalPlan>> {
self.table_provider.get_logical_plan()
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! View data source which uses a LogicalPlan as it's input.
use std::{any::Any, sync::Arc};
use std::{any::Any, borrow::Cow, sync::Arc};

use crate::{
error::Result,
Expand Down Expand Up @@ -90,8 +90,8 @@ impl TableProvider for ViewTable {
self
}

fn get_logical_plan(&self) -> Option<&LogicalPlan> {
Some(&self.logical_plan)
fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
Some(Cow::Borrowed(&self.logical_plan))
}

fn schema(&self) -> SchemaRef {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{Expr, LogicalPlan};
use arrow::datatypes::SchemaRef;
use datafusion_common::{Constraints, Result};

use std::any::Any;
use std::{any::Any, borrow::Cow};

/// Indicates how a filter expression is handled by
/// [`TableProvider::scan`].
Expand Down Expand Up @@ -122,7 +122,7 @@ pub trait TableSource: Sync + Send {
}

/// Get the Logical plan of this table provider, if available.
fn get_logical_plan(&self) -> Option<&LogicalPlan> {
fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
None
}

Expand Down
43 changes: 21 additions & 22 deletions datafusion/optimizer/src/analyzer/inline_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{Column, Result};
use datafusion_expr::expr::WildcardOptions;
use datafusion_expr::{logical_plan::LogicalPlan, Expr, LogicalPlanBuilder, TableScan};
use datafusion_expr::{logical_plan::LogicalPlan, Expr, LogicalPlanBuilder};

/// Analyzed rule that inlines TableScan that provide a [`LogicalPlan`]
/// (DataFrame / ViewTable)
Expand Down Expand Up @@ -56,24 +56,23 @@ fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
match plan {
// Match only on scans without filter / projection / fetch
// Views and DataFrames won't have those added
// during the early stage of planning
LogicalPlan::TableScan(TableScan {
table_name,
source,
projection,
filters,
..
}) if filters.is_empty() && source.get_logical_plan().is_some() => {
let sub_plan = source.get_logical_plan().unwrap();
let projection_exprs = generate_projection_expr(&projection, sub_plan)?;
LogicalPlanBuilder::from(sub_plan.clone())
.project(projection_exprs)?
// Ensures that the reference to the inlined table remains the
// same, meaning we don't have to change any of the parent nodes
// that reference this table.
.alias(table_name)?
.build()
.map(Transformed::yes)
// during the early stage of planning.
LogicalPlan::TableScan(table_scan) if table_scan.filters.is_empty() => {
if let Some(sub_plan) = table_scan.source.get_logical_plan() {
let sub_plan = sub_plan.into_owned();
let projection_exprs =
generate_projection_expr(&table_scan.projection, &sub_plan)?;
LogicalPlanBuilder::from(sub_plan)
.project(projection_exprs)?
// Ensures that the reference to the inlined table remains the
// same, meaning we don't have to change any of the parent nodes
// that reference this table.
.alias(table_scan.table_name)?
.build()
.map(Transformed::yes)
} else {
Ok(Transformed::no(LogicalPlan::TableScan(table_scan)))
}
}
_ => Ok(Transformed::no(plan)),
}
Expand Down Expand Up @@ -104,7 +103,7 @@ fn generate_projection_expr(

#[cfg(test)]
mod tests {
use std::{sync::Arc, vec};
use std::{borrow::Cow, sync::Arc, vec};

use crate::analyzer::inline_table_scan::InlineTableScan;
use crate::test::assert_analyzed_plan_eq;
Expand Down Expand Up @@ -167,8 +166,8 @@ mod tests {
Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]))
}

fn get_logical_plan(&self) -> Option<&LogicalPlan> {
Some(&self.plan)
fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
Some(Cow::Borrowed(&self.plan))
}
}

Expand Down

0 comments on commit da3f6af

Please sign in to comment.