Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support unparsing UNNEST plan to UNNEST table factor SQL #13660

Merged
merged 7 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions datafusion/sql/src/unparser/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ pub(super) struct RelationBuilder {
enum TableFactorBuilder {
Table(TableRelationBuilder),
Derived(DerivedRelationBuilder),
Unnest(UnnestRelationBuilder),
Empty,
}

Expand All @@ -369,6 +370,12 @@ impl RelationBuilder {
self.relation = Some(TableFactorBuilder::Derived(value));
self
}

pub fn unnest(&mut self, value: UnnestRelationBuilder) -> &mut Self {
self.relation = Some(TableFactorBuilder::Unnest(value));
self
}

pub fn empty(&mut self) -> &mut Self {
self.relation = Some(TableFactorBuilder::Empty);
self
Expand All @@ -382,6 +389,9 @@ impl RelationBuilder {
Some(TableFactorBuilder::Derived(ref mut rel_builder)) => {
rel_builder.alias = value;
}
Some(TableFactorBuilder::Unnest(ref mut rel_builder)) => {
rel_builder.alias = value;
}
Some(TableFactorBuilder::Empty) => (),
None => (),
}
Expand All @@ -391,6 +401,7 @@ impl RelationBuilder {
Ok(match self.relation {
Some(TableFactorBuilder::Table(ref value)) => Some(value.build()?),
Some(TableFactorBuilder::Derived(ref value)) => Some(value.build()?),
Some(TableFactorBuilder::Unnest(ref value)) => Some(value.build()?),
Some(TableFactorBuilder::Empty) => None,
None => return Err(Into::into(UninitializedFieldError::from("relation"))),
})
Expand Down Expand Up @@ -526,6 +537,68 @@ impl Default for DerivedRelationBuilder {
}
}

#[derive(Clone)]
pub(super) struct UnnestRelationBuilder {
pub alias: Option<ast::TableAlias>,
pub array_exprs: Vec<ast::Expr>,
with_offset: bool,
with_offset_alias: Option<ast::Ident>,
with_ordinality: bool,
}

#[allow(dead_code)]
impl UnnestRelationBuilder {
pub fn alias(&mut self, value: Option<ast::TableAlias>) -> &mut Self {
self.alias = value;
self
}
pub fn array_exprs(&mut self, value: Vec<ast::Expr>) -> &mut Self {
self.array_exprs = value;
self
}

pub fn with_offset(&mut self, value: bool) -> &mut Self {
self.with_offset = value;
self
}

pub fn with_offset_alias(&mut self, value: Option<ast::Ident>) -> &mut Self {
self.with_offset_alias = value;
self
}

pub fn with_ordinality(&mut self, value: bool) -> &mut Self {
self.with_ordinality = value;
self
}

pub fn build(&self) -> Result<ast::TableFactor, BuilderError> {
Ok(ast::TableFactor::UNNEST {
alias: self.alias.clone(),
array_exprs: self.array_exprs.clone(),
with_offset: self.with_offset,
with_offset_alias: self.with_offset_alias.clone(),
with_ordinality: self.with_ordinality,
})
}

fn create_empty() -> Self {
Self {
alias: Default::default(),
array_exprs: Default::default(),
with_offset: Default::default(),
with_offset_alias: Default::default(),
with_ordinality: Default::default(),
}
}
}

impl Default for UnnestRelationBuilder {
fn default() -> Self {
Self::create_empty()
}
}

/// Runtime error when a `build()` method is called and one or more required fields
/// do not have a value.
#[derive(Debug, Clone)]
Expand Down
23 changes: 23 additions & 0 deletions datafusion/sql/src/unparser/dialect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,15 @@ pub trait Dialect: Send + Sync {
fn full_qualified_col(&self) -> bool {
false
}

/// Allow to unparse the unnest plan as [ast::TableFactor::UNNEST].
///
/// Some dialects like BigQuery require UNNEST to be used in the FROM clause but
/// the LogicalPlan planner always puts UNNEST in the SELECT clause. This flag allows
/// to unparse the UNNEST plan as [ast::TableFactor::UNNEST] instead of a subquery.
fn unnest_as_table_factor(&self) -> bool {
false
}
}

/// `IntervalStyle` to use for unparsing
Expand Down Expand Up @@ -448,6 +457,7 @@ pub struct CustomDialect {
requires_derived_table_alias: bool,
division_operator: BinaryOperator,
full_qualified_col: bool,
unnest_as_table_factor: bool,
}

impl Default for CustomDialect {
Expand All @@ -474,6 +484,7 @@ impl Default for CustomDialect {
requires_derived_table_alias: false,
division_operator: BinaryOperator::Divide,
full_qualified_col: false,
unnest_as_table_factor: false,
}
}
}
Expand Down Expand Up @@ -582,6 +593,10 @@ impl Dialect for CustomDialect {
fn full_qualified_col(&self) -> bool {
self.full_qualified_col
}

fn unnest_as_table_factor(&self) -> bool {
self.unnest_as_table_factor
}
}

/// `CustomDialectBuilder` to build `CustomDialect` using builder pattern
Expand Down Expand Up @@ -617,6 +632,7 @@ pub struct CustomDialectBuilder {
requires_derived_table_alias: bool,
division_operator: BinaryOperator,
full_qualified_col: bool,
unnest_as_table_factor: bool,
}

impl Default for CustomDialectBuilder {
Expand Down Expand Up @@ -649,6 +665,7 @@ impl CustomDialectBuilder {
requires_derived_table_alias: false,
division_operator: BinaryOperator::Divide,
full_qualified_col: false,
unnest_as_table_factor: false,
}
}

Expand All @@ -673,6 +690,7 @@ impl CustomDialectBuilder {
requires_derived_table_alias: self.requires_derived_table_alias,
division_operator: self.division_operator,
full_qualified_col: self.full_qualified_col,
unnest_as_table_factor: self.unnest_as_table_factor,
}
}

Expand Down Expand Up @@ -800,4 +818,9 @@ impl CustomDialectBuilder {
self.full_qualified_col = full_qualified_col;
self
}

pub fn with_unnest_as_table_factor(mut self, _unnest_as_table_factor: bool) -> Self {
self.unnest_as_table_factor = _unnest_as_table_factor;
self
}
}
55 changes: 53 additions & 2 deletions datafusion/sql/src/unparser/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,17 @@ use super::{
},
Unparser,
};
use crate::unparser::ast::UnnestRelationBuilder;
use crate::unparser::utils::unproject_agg_exprs;
use crate::utils::UNNEST_PLACEHOLDER;
use datafusion_common::{
internal_err, not_impl_err,
tree_node::{TransformedResult, TreeNode},
Column, DataFusionError, Result, TableReference,
};
use datafusion_expr::{
expr::Alias, BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan,
LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest,
};
use sqlparser::ast::{self, Ident, SetExpr};
use std::sync::Arc;
Expand Down Expand Up @@ -312,6 +314,19 @@ impl Unparser<'_> {
.select_to_sql_recursively(&new_plan, query, select, relation);
}

// Projection can be top-level plan for unnest relation
// The projection generated by the `RecursiveUnnestRewriter` from a UNNEST relation will have
// only one expression, which is the placeholder column generated by the rewriter.
if self.dialect.unnest_as_table_factor()
&& p.expr.len() == 1
&& Self::is_unnest_placeholder(&p.expr[0])
{
if let LogicalPlan::Unnest(unnest) = &p.input.as_ref() {
return self
.unnest_to_table_factor_sql(unnest, query, select, relation);
}
}

// Projection can be top-level plan for derived table
if select.already_projected() {
return self.derive_with_dialect_alias(
Expand Down Expand Up @@ -678,7 +693,11 @@ impl Unparser<'_> {
)
}
LogicalPlan::EmptyRelation(_) => {
relation.empty();
// An EmptyRelation could be behind an UNNEST node. If the dialect supports UNNEST as a table factor,
// a TableRelationBuilder will be created for the UNNEST node first.
if !relation.has_relation() {
relation.empty();
}
Ok(())
}
LogicalPlan::Extension(_) => not_impl_err!("Unsupported operator: {plan:?}"),
Expand Down Expand Up @@ -708,6 +727,38 @@ impl Unparser<'_> {
}
}

/// Try to find the placeholder column name generated by `RecursiveUnnestRewriter`
/// Only match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(...)"))`
fn is_unnest_placeholder(expr: &Expr) -> bool {
if let Expr::Alias(Alias { expr, .. }) = expr {
if let Expr::Column(Column { name, .. }) = expr.as_ref() {
return name.starts_with(UNNEST_PLACEHOLDER);
}
}
false
}

fn unnest_to_table_factor_sql(
&self,
unnest: &Unnest,
query: &mut Option<QueryBuilder>,
select: &mut SelectBuilder,
relation: &mut RelationBuilder,
) -> Result<()> {
let mut unnest_relation = UnnestRelationBuilder::default();
let LogicalPlan::Projection(p) = unnest.input.as_ref() else {
return internal_err!("Unnest input is not a Projection: {unnest:?}");
};
let exprs = p
.expr
.iter()
.map(|e| self.expr_to_sql(e))
.collect::<Result<Vec<_>>>()?;
unnest_relation.array_exprs(exprs);
relation.unnest(unnest_relation);
self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
}

fn is_scan_with_pushdown(scan: &TableScan) -> bool {
scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some()
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/unparser/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ pub(crate) fn find_window_nodes_within_select<'a>(

/// Recursively identify Column expressions and transform them into the appropriate unnest expression
///
/// For example, if expr contains the column expr "unnest_placeholder(make_array(Int64(1),Int64(2),Int64(2),Int64(5),NULL),depth=1)"
/// For example, if expr contains the column expr "__unnest_placeholder(make_array(Int64(1),Int64(2),Int64(2),Int64(5),NULL),depth=1)"
/// it will be transformed into an actual unnest expression UNNEST([1, 2, 2, 5, NULL])
pub(crate) fn unproject_unnest_expr(expr: Expr, unnest: &Unnest) -> Result<Expr> {
expr.transform(|sub_expr| {
Expand Down
Loading
Loading