Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve unparsing after optimize_projections optimization #13599

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 36 additions & 24 deletions datafusion/sql/src/unparser/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,11 @@ impl Unparser<'_> {
) -> Result<()> {
match plan {
LogicalPlan::TableScan(scan) => {
if let Some(unparsed_table_scan) =
Self::unparse_table_scan_pushdown(plan, None)?
{
if let Some(unparsed_table_scan) = Self::unparse_table_scan_pushdown(
plan,
None,
select.already_projected(),
)? {
return self.select_to_sql_recursively(
&unparsed_table_scan,
query,
Expand Down Expand Up @@ -585,6 +587,7 @@ impl Unparser<'_> {
let unparsed_table_scan = Self::unparse_table_scan_pushdown(
plan,
Some(plan_alias.alias.clone()),
select.already_projected(),
)?;
// if the child plan is a TableScan with pushdown operations, we don't need to
// create an additional subquery for it
Expand Down Expand Up @@ -714,6 +717,7 @@ impl Unparser<'_> {
fn unparse_table_scan_pushdown(
plan: &LogicalPlan,
alias: Option<TableReference>,
already_projected: bool,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::TableScan(table_scan) => {
Expand Down Expand Up @@ -743,24 +747,29 @@ impl Unparser<'_> {
}
}

if let Some(project_vec) = &table_scan.projection {
let project_columns = project_vec
.iter()
.cloned()
.map(|i| {
let schema = table_scan.source.schema();
let field = schema.field(i);
if alias.is_some() {
Column::new(alias.clone(), field.name().clone())
} else {
Column::new(
Some(table_scan.table_name.clone()),
field.name().clone(),
)
}
})
.collect::<Vec<_>>();
builder = builder.project(project_columns)?;
// Avoid creating a duplicate Projection node, which would result in an additional subquery if a projection already exists.
// For example, if the `optimize_projection` rule is applied, there will be a Projection node, and duplicate projection
// information included in the TableScan node.
if !already_projected {
if let Some(project_vec) = &table_scan.projection {
let project_columns = project_vec
.iter()
.cloned()
.map(|i| {
let schema = table_scan.source.schema();
let field = schema.field(i);
if alias.is_some() {
Column::new(alias.clone(), field.name().clone())
} else {
Column::new(
Some(table_scan.table_name.clone()),
field.name().clone(),
)
}
})
.collect::<Vec<_>>();
builder = builder.project(project_columns)?;
}
}

let filter_expr: Result<Option<Expr>> = table_scan
Expand Down Expand Up @@ -805,14 +814,17 @@ impl Unparser<'_> {
Self::unparse_table_scan_pushdown(
&subquery_alias.input,
Some(subquery_alias.alias.clone()),
already_projected,
)
}
// SubqueryAlias could be rewritten to a plan with a projection as the top node by [rewrite::subquery_alias_inner_query_and_columns].
// The inner table scan could be a scan with pushdown operations.
LogicalPlan::Projection(projection) => {
if let Some(plan) =
Self::unparse_table_scan_pushdown(&projection.input, alias.clone())?
{
if let Some(plan) = Self::unparse_table_scan_pushdown(
&projection.input,
alias.clone(),
already_projected,
)? {
let exprs = if alias.is_some() {
let mut alias_rewriter =
alias.as_ref().map(|alias_name| TableAliasRewriter {
Expand Down
17 changes: 15 additions & 2 deletions datafusion/sql/tests/cases/plan_to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ fn test_aggregation_without_projection() -> Result<()> {

assert_eq!(
actual,
r#"SELECT sum(users.age), users."name" FROM (SELECT users."name", users.age FROM users) GROUP BY users."name""#
r#"SELECT sum(users.age), users."name" FROM users GROUP BY users."name""#
);

Ok(())
Expand Down Expand Up @@ -926,12 +926,25 @@ fn test_table_scan_pushdown() -> Result<()> {
let query_from_table_scan_with_projection = LogicalPlanBuilder::from(
table_scan(Some("t1"), &schema, Some(vec![0, 1]))?.build()?,
)
.project(vec![wildcard()])?
sgrebnov marked this conversation as resolved.
Show resolved Hide resolved
.project(vec![col("id"), col("age")])?
.build()?;
let query_from_table_scan_with_projection =
plan_to_sql(&query_from_table_scan_with_projection)?;
assert_eq!(
query_from_table_scan_with_projection.to_string(),
"SELECT t1.id, t1.age FROM t1"
);

let query_from_table_scan_with_two_projections = LogicalPlanBuilder::from(
table_scan(Some("t1"), &schema, Some(vec![0, 1]))?.build()?,
)
.project(vec![col("id"), col("age")])?
.project(vec![wildcard()])?
.build()?;
let query_from_table_scan_with_two_projections =
plan_to_sql(&query_from_table_scan_with_two_projections)?;
assert_eq!(
query_from_table_scan_with_two_projections.to_string(),
"SELECT * FROM (SELECT t1.id, t1.age FROM t1)"
);

Expand Down