Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge unparsing improvements from Datafusion main branch #48

Merged
merged 7 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 134 additions & 32 deletions datafusion/sql/src/unparser/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ use crate::unparser::utils::{
find_unnest_node_within_select, unproject_agg_exprs, unproject_unnest_expr,
};
use datafusion_common::{
internal_err, not_impl_err, Column, DataFusionError, Result, TableReference,
internal_err, not_impl_err,
tree_node::{TransformedResult, TreeNode},
Column, DataFusionError, Result, TableReference,
};
use datafusion_expr::{
expr::Alias, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
LogicalPlanBuilder, Projection, SortExpr,
LogicalPlanBuilder, Projection, SortExpr, TableScan,
};
use sqlparser::ast::{self, Ident, SetExpr};
use std::sync::Arc;
Expand All @@ -36,7 +38,7 @@ use super::{
rewrite::{
inject_column_aliases_into_subquery, normalize_union_schema,
rewrite_plan_for_sort_on_non_projected_fields,
subquery_alias_inner_query_and_columns,
subquery_alias_inner_query_and_columns, TableAliasRewriter,
},
utils::{
find_agg_node_within_select, find_window_nodes_within_select,
Expand Down Expand Up @@ -277,12 +279,9 @@ impl Unparser<'_> {
) -> Result<()> {
match plan {
LogicalPlan::TableScan(scan) => {
if scan.projection.is_some()
|| !scan.filters.is_empty()
|| scan.fetch.is_some()
if let Some(unparsed_table_scan) =
Self::unparse_table_scan_pushdown(plan, None)?
{
let unparsed_table_scan =
Self::unparse_table_scan_pushdown(plan, None)?;
return self.select_to_sql_recursively(
&unparsed_table_scan,
query,
Expand Down Expand Up @@ -366,6 +365,21 @@ impl Unparser<'_> {
))));
}

if limit.skip > 0 {
let Some(query) = query.as_mut() else {
return internal_err!(
"Offset operator only valid in a statement context."
);
};
query.offset(Some(ast::Offset {
rows: ast::OffsetRows::None,
value: ast::Expr::Value(ast::Value::Number(
limit.skip.to_string(),
false,
)),
}));
}

self.select_to_sql_recursively(
limit.input.as_ref(),
query,
Expand All @@ -388,6 +402,13 @@ impl Unparser<'_> {
);
};

if let Some(fetch) = sort.fetch {
query_ref.limit(Some(ast::Expr::Value(ast::Value::Number(
fetch.to_string(),
false,
))));
}

let agg = find_agg_node_within_select(plan, select.already_projected());
// unproject sort expressions
let sort_exprs: Vec<SortExpr> = sort
Expand Down Expand Up @@ -532,10 +553,18 @@ impl Unparser<'_> {
LogicalPlan::SubqueryAlias(plan_alias) => {
let (plan, mut columns) =
subquery_alias_inner_query_and_columns(plan_alias);
let plan = Self::unparse_table_scan_pushdown(
let unparsed_table_scan = Self::unparse_table_scan_pushdown(
plan,
Some(plan_alias.alias.clone()),
)?;
// if the child plan is a TableScan with pushdown operations, we don't need to
// create an additional subquery for it
if !select.already_projected() && unparsed_table_scan.is_none() {
select.projection(vec![ast::SelectItem::Wildcard(
ast::WildcardAdditionalOptions::default(),
)]);
}
let plan = unparsed_table_scan.unwrap_or_else(|| plan.clone());
if !columns.is_empty()
&& !self.dialect.supports_column_alias_in_table_alias()
{
Expand Down Expand Up @@ -644,70 +673,143 @@ impl Unparser<'_> {
}
}

fn is_scan_with_pushdown(scan: &TableScan) -> bool {
scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some()
}

/// Try to unparse a table scan with pushdown operations into a new subquery plan.
/// If the table scan is without any pushdown operations, return None.
fn unparse_table_scan_pushdown(
plan: &LogicalPlan,
alias: Option<TableReference>,
) -> Result<LogicalPlan> {
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::TableScan(table_scan) => {
// TODO: support filters for table scan with alias. Remove this check after #12368 issue.
// see the issue: https://github.com/apache/datafusion/issues/12368
if alias.is_some() && !table_scan.filters.is_empty() {
return not_impl_err!(
"Subquery alias is not supported for table scan with pushdown filters"
);
if !Self::is_scan_with_pushdown(table_scan) {
return Ok(None);
}
let table_schema = table_scan.source.schema();
let mut filter_alias_rewriter =
alias.as_ref().map(|alias_name| TableAliasRewriter {
table_schema: &table_schema,
alias_name: alias_name.clone(),
});

let mut builder = LogicalPlanBuilder::scan(
table_scan.table_name.clone(),
Arc::clone(&table_scan.source),
None,
)?;
// We will rebase the column references to the new alias if it exists.
// If the projection or filters are empty, we will append alias to the table scan.
//
// Example:
// select t1.c1 from t1 where t1.c1 > 1 -> select a.c1 from t1 as a where a.c1 > 1
if alias.is_some()
&& (table_scan.projection.is_some() || !table_scan.filters.is_empty())
{
builder = builder.alias(alias.clone().unwrap())?;
}

if let Some(project_vec) = &table_scan.projection {
let project_columns = project_vec
.iter()
.cloned()
.map(|i| {
let (qualifier, field) =
table_scan.projected_schema.qualified_field(i);
let schema = table_scan.source.schema();
let field = schema.field(i);
if alias.is_some() {
Column::new(alias.clone(), field.name().clone())
} else {
Column::new(qualifier.cloned(), field.name().clone())
Column::new(
Some(table_scan.table_name.clone()),
field.name().clone(),
)
}
})
.collect::<Vec<_>>();
if let Some(alias) = alias {
builder = builder.alias(alias)?;
}
builder = builder.project(project_columns)?;
}

let filter_expr = table_scan
let filter_expr: Result<Option<Expr>> = table_scan
.filters
.iter()
.cloned()
.reduce(|acc, expr| acc.and(expr));
if let Some(filter) = filter_expr {
.map(|expr| {
if let Some(ref mut rewriter) = filter_alias_rewriter {
expr.rewrite(rewriter).data()
} else {
Ok(expr)
}
})
.reduce(|acc, expr_result| {
acc.and_then(|acc_expr| {
expr_result.map(|expr| acc_expr.and(expr))
})
})
.transpose();

if let Some(filter) = filter_expr? {
builder = builder.filter(filter)?;
}

if let Some(fetch) = table_scan.fetch {
builder = builder.limit(0, Some(fetch))?;
}

builder.build()
// If the table scan has an alias but no projection or filters, it means no column references are rebased.
// So we will append the alias to this subquery.
// Example:
// select * from t1 limit 10 -> (select * from t1 limit 10) as a
if alias.is_some()
&& (table_scan.projection.is_none() && table_scan.filters.is_empty())
{
builder = builder.alias(alias.clone().unwrap())?;
}

Ok(Some(builder.build()?))
}
LogicalPlan::SubqueryAlias(subquery_alias) => {
let new_plan = Self::unparse_table_scan_pushdown(
Self::unparse_table_scan_pushdown(
&subquery_alias.input,
Some(subquery_alias.alias.clone()),
)?;
LogicalPlanBuilder::from(new_plan)
.alias(subquery_alias.alias.clone())?
.build()
)
}
// SubqueryAlias could be rewritten to a plan with a projection as the top node by [rewrite::subquery_alias_inner_query_and_columns].
// The inner table scan could be a scan with pushdown operations.
LogicalPlan::Projection(projection) => {
if let Some(plan) =
Self::unparse_table_scan_pushdown(&projection.input, alias.clone())?
{
let exprs = if alias.is_some() {
let mut alias_rewriter =
alias.as_ref().map(|alias_name| TableAliasRewriter {
table_schema: plan.schema().as_arrow(),
alias_name: alias_name.clone(),
});
projection
.expr
.iter()
.cloned()
.map(|expr| {
if let Some(ref mut rewriter) = alias_rewriter {
expr.rewrite(rewriter).data()
} else {
Ok(expr)
}
})
.collect::<Result<Vec<_>>>()?
} else {
projection.expr.clone()
};
Ok(Some(
LogicalPlanBuilder::from(plan).project(exprs)?.build()?,
))
} else {
Ok(None)
}
}
_ => Ok(plan.clone()),
_ => Ok(None),
}
}

Expand Down
Loading