Skip to content

Commit

Permalink
Merge unparsing improvements from Datafusion main branch (#48)
Browse files Browse the repository at this point in the history
* init (apache#12453)

* Fix unparse table scan with the projection pushdown (apache#12534)

* unparse the projection base on the source schema

* refactor and enhance the test

* Fix unparsing OFFSET (apache#12539)

* Unparse Sort with pushdown limit to SQL string (apache#12873)

* unparse Sort with push down limit

* cargo fmt

* set query limit directly

* Unparse `SubqueryAlias` without projections to SQL (apache#12896)

* change pub function comment to doc

* unparse subquery alias without projections

* fix tests

* rollback the empty line

* rollback the empty line

* exclude the table_scan with pushdown case

* fmt and clippy

* simplify the ast to string and remove unused debug code

* enhance unparsing plan with pushdown to avoid unnamed subquery (apache#13006)

* Update

---------

Co-authored-by: Lordworms <[email protected]>
Co-authored-by: Jax Liu <[email protected]>
Co-authored-by: Justus Flerlage <[email protected]>
  • Loading branch information
4 people authored Oct 24, 2024
1 parent fbedf88 commit 3161715
Show file tree
Hide file tree
Showing 3 changed files with 406 additions and 121 deletions.
166 changes: 134 additions & 32 deletions datafusion/sql/src/unparser/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ use crate::unparser::utils::{
find_unnest_node_within_select, unproject_agg_exprs, unproject_unnest_expr,
};
use datafusion_common::{
internal_err, not_impl_err, Column, DataFusionError, Result, TableReference,
internal_err, not_impl_err,
tree_node::{TransformedResult, TreeNode},
Column, DataFusionError, Result, TableReference,
};
use datafusion_expr::{
expr::Alias, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
LogicalPlanBuilder, Projection, SortExpr,
LogicalPlanBuilder, Projection, SortExpr, TableScan,
};
use sqlparser::ast::{self, Ident, SetExpr};
use std::sync::Arc;
Expand All @@ -36,7 +38,7 @@ use super::{
rewrite::{
inject_column_aliases_into_subquery, normalize_union_schema,
rewrite_plan_for_sort_on_non_projected_fields,
subquery_alias_inner_query_and_columns,
subquery_alias_inner_query_and_columns, TableAliasRewriter,
},
utils::{
find_agg_node_within_select, find_window_nodes_within_select,
Expand Down Expand Up @@ -277,12 +279,9 @@ impl Unparser<'_> {
) -> Result<()> {
match plan {
LogicalPlan::TableScan(scan) => {
if scan.projection.is_some()
|| !scan.filters.is_empty()
|| scan.fetch.is_some()
if let Some(unparsed_table_scan) =
Self::unparse_table_scan_pushdown(plan, None)?
{
let unparsed_table_scan =
Self::unparse_table_scan_pushdown(plan, None)?;
return self.select_to_sql_recursively(
&unparsed_table_scan,
query,
Expand Down Expand Up @@ -366,6 +365,21 @@ impl Unparser<'_> {
))));
}

if limit.skip > 0 {
let Some(query) = query.as_mut() else {
return internal_err!(
"Offset operator only valid in a statement context."
);
};
query.offset(Some(ast::Offset {
rows: ast::OffsetRows::None,
value: ast::Expr::Value(ast::Value::Number(
limit.skip.to_string(),
false,
)),
}));
}

self.select_to_sql_recursively(
limit.input.as_ref(),
query,
Expand All @@ -388,6 +402,13 @@ impl Unparser<'_> {
);
};

if let Some(fetch) = sort.fetch {
query_ref.limit(Some(ast::Expr::Value(ast::Value::Number(
fetch.to_string(),
false,
))));
}

let agg = find_agg_node_within_select(plan, select.already_projected());
// unproject sort expressions
let sort_exprs: Vec<SortExpr> = sort
Expand Down Expand Up @@ -532,10 +553,18 @@ impl Unparser<'_> {
LogicalPlan::SubqueryAlias(plan_alias) => {
let (plan, mut columns) =
subquery_alias_inner_query_and_columns(plan_alias);
let plan = Self::unparse_table_scan_pushdown(
let unparsed_table_scan = Self::unparse_table_scan_pushdown(
plan,
Some(plan_alias.alias.clone()),
)?;
// if the child plan is a TableScan with pushdown operations, we don't need to
// create an additional subquery for it
if !select.already_projected() && unparsed_table_scan.is_none() {
select.projection(vec![ast::SelectItem::Wildcard(
ast::WildcardAdditionalOptions::default(),
)]);
}
let plan = unparsed_table_scan.unwrap_or_else(|| plan.clone());
if !columns.is_empty()
&& !self.dialect.supports_column_alias_in_table_alias()
{
Expand Down Expand Up @@ -644,70 +673,143 @@ impl Unparser<'_> {
}
}

fn is_scan_with_pushdown(scan: &TableScan) -> bool {
scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some()
}

/// Try to unparse a table scan with pushdown operations into a new subquery plan.
/// If the table scan is without any pushdown operations, return None.
fn unparse_table_scan_pushdown(
plan: &LogicalPlan,
alias: Option<TableReference>,
) -> Result<LogicalPlan> {
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::TableScan(table_scan) => {
// TODO: support filters for table scan with alias. Remove this check after #12368 issue.
// see the issue: https://github.com/apache/datafusion/issues/12368
if alias.is_some() && !table_scan.filters.is_empty() {
return not_impl_err!(
"Subquery alias is not supported for table scan with pushdown filters"
);
if !Self::is_scan_with_pushdown(table_scan) {
return Ok(None);
}
let table_schema = table_scan.source.schema();
let mut filter_alias_rewriter =
alias.as_ref().map(|alias_name| TableAliasRewriter {
table_schema: &table_schema,
alias_name: alias_name.clone(),
});

let mut builder = LogicalPlanBuilder::scan(
table_scan.table_name.clone(),
Arc::clone(&table_scan.source),
None,
)?;
// We will rebase the column references to the new alias if it exists.
// If the projection or filters are empty, we will append alias to the table scan.
//
// Example:
// select t1.c1 from t1 where t1.c1 > 1 -> select a.c1 from t1 as a where a.c1 > 1
if alias.is_some()
&& (table_scan.projection.is_some() || !table_scan.filters.is_empty())
{
builder = builder.alias(alias.clone().unwrap())?;
}

if let Some(project_vec) = &table_scan.projection {
let project_columns = project_vec
.iter()
.cloned()
.map(|i| {
let (qualifier, field) =
table_scan.projected_schema.qualified_field(i);
let schema = table_scan.source.schema();
let field = schema.field(i);
if alias.is_some() {
Column::new(alias.clone(), field.name().clone())
} else {
Column::new(qualifier.cloned(), field.name().clone())
Column::new(
Some(table_scan.table_name.clone()),
field.name().clone(),
)
}
})
.collect::<Vec<_>>();
if let Some(alias) = alias {
builder = builder.alias(alias)?;
}
builder = builder.project(project_columns)?;
}

let filter_expr = table_scan
let filter_expr: Result<Option<Expr>> = table_scan
.filters
.iter()
.cloned()
.reduce(|acc, expr| acc.and(expr));
if let Some(filter) = filter_expr {
.map(|expr| {
if let Some(ref mut rewriter) = filter_alias_rewriter {
expr.rewrite(rewriter).data()
} else {
Ok(expr)
}
})
.reduce(|acc, expr_result| {
acc.and_then(|acc_expr| {
expr_result.map(|expr| acc_expr.and(expr))
})
})
.transpose();

if let Some(filter) = filter_expr? {
builder = builder.filter(filter)?;
}

if let Some(fetch) = table_scan.fetch {
builder = builder.limit(0, Some(fetch))?;
}

builder.build()
// If the table scan has an alias but no projection or filters, it means no column references are rebased.
// So we will append the alias to this subquery.
// Example:
// select * from t1 limit 10 -> (select * from t1 limit 10) as a
if alias.is_some()
&& (table_scan.projection.is_none() && table_scan.filters.is_empty())
{
builder = builder.alias(alias.clone().unwrap())?;
}

Ok(Some(builder.build()?))
}
LogicalPlan::SubqueryAlias(subquery_alias) => {
let new_plan = Self::unparse_table_scan_pushdown(
Self::unparse_table_scan_pushdown(
&subquery_alias.input,
Some(subquery_alias.alias.clone()),
)?;
LogicalPlanBuilder::from(new_plan)
.alias(subquery_alias.alias.clone())?
.build()
)
}
// SubqueryAlias could be rewritten to a plan with a projection as the top node by [rewrite::subquery_alias_inner_query_and_columns].
// The inner table scan could be a scan with pushdown operations.
LogicalPlan::Projection(projection) => {
if let Some(plan) =
Self::unparse_table_scan_pushdown(&projection.input, alias.clone())?
{
let exprs = if alias.is_some() {
let mut alias_rewriter =
alias.as_ref().map(|alias_name| TableAliasRewriter {
table_schema: plan.schema().as_arrow(),
alias_name: alias_name.clone(),
});
projection
.expr
.iter()
.cloned()
.map(|expr| {
if let Some(ref mut rewriter) = alias_rewriter {
expr.rewrite(rewriter).data()
} else {
Ok(expr)
}
})
.collect::<Result<Vec<_>>>()?
} else {
projection.expr.clone()
};
Ok(Some(
LogicalPlanBuilder::from(plan).project(exprs)?.build()?,
))
} else {
Ok(None)
}
}
_ => Ok(plan.clone()),
_ => Ok(None),
}
}

Expand Down
Loading

0 comments on commit 3161715

Please sign in to comment.