Skip to content

Commit

Permalink
Support unparsing UNNEST plan to UNNEST table factor SQL (apache#…
Browse files Browse the repository at this point in the history
…13660)

* add `unnest_as_table_factor` and `UnnestRelationBuilder`

* unparse unnest as table factor

* fix typo

* add tests for the default configs

* add a static const for unnest_placeholder

* fix tests

* fix tests
  • Loading branch information
goldmedal authored and zhuliquan committed Dec 15, 2024
1 parent 1ab089e commit 2b65fb3
Show file tree
Hide file tree
Showing 12 changed files with 313 additions and 74 deletions.
73 changes: 73 additions & 0 deletions datafusion/sql/src/unparser/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ pub(super) struct RelationBuilder {
enum TableFactorBuilder {
Table(TableRelationBuilder),
Derived(DerivedRelationBuilder),
Unnest(UnnestRelationBuilder),
Empty,
}

Expand All @@ -369,6 +370,12 @@ impl RelationBuilder {
self.relation = Some(TableFactorBuilder::Derived(value));
self
}

pub fn unnest(&mut self, value: UnnestRelationBuilder) -> &mut Self {
self.relation = Some(TableFactorBuilder::Unnest(value));
self
}

pub fn empty(&mut self) -> &mut Self {
self.relation = Some(TableFactorBuilder::Empty);
self
Expand All @@ -382,6 +389,9 @@ impl RelationBuilder {
Some(TableFactorBuilder::Derived(ref mut rel_builder)) => {
rel_builder.alias = value;
}
Some(TableFactorBuilder::Unnest(ref mut rel_builder)) => {
rel_builder.alias = value;
}
Some(TableFactorBuilder::Empty) => (),
None => (),
}
Expand All @@ -391,6 +401,7 @@ impl RelationBuilder {
Ok(match self.relation {
Some(TableFactorBuilder::Table(ref value)) => Some(value.build()?),
Some(TableFactorBuilder::Derived(ref value)) => Some(value.build()?),
Some(TableFactorBuilder::Unnest(ref value)) => Some(value.build()?),
Some(TableFactorBuilder::Empty) => None,
None => return Err(Into::into(UninitializedFieldError::from("relation"))),
})
Expand Down Expand Up @@ -526,6 +537,68 @@ impl Default for DerivedRelationBuilder {
}
}

#[derive(Clone)]
pub(super) struct UnnestRelationBuilder {
pub alias: Option<ast::TableAlias>,
pub array_exprs: Vec<ast::Expr>,
with_offset: bool,
with_offset_alias: Option<ast::Ident>,
with_ordinality: bool,
}

#[allow(dead_code)]
impl UnnestRelationBuilder {
pub fn alias(&mut self, value: Option<ast::TableAlias>) -> &mut Self {
self.alias = value;
self
}
pub fn array_exprs(&mut self, value: Vec<ast::Expr>) -> &mut Self {
self.array_exprs = value;
self
}

pub fn with_offset(&mut self, value: bool) -> &mut Self {
self.with_offset = value;
self
}

pub fn with_offset_alias(&mut self, value: Option<ast::Ident>) -> &mut Self {
self.with_offset_alias = value;
self
}

pub fn with_ordinality(&mut self, value: bool) -> &mut Self {
self.with_ordinality = value;
self
}

pub fn build(&self) -> Result<ast::TableFactor, BuilderError> {
Ok(ast::TableFactor::UNNEST {
alias: self.alias.clone(),
array_exprs: self.array_exprs.clone(),
with_offset: self.with_offset,
with_offset_alias: self.with_offset_alias.clone(),
with_ordinality: self.with_ordinality,
})
}

fn create_empty() -> Self {
Self {
alias: Default::default(),
array_exprs: Default::default(),
with_offset: Default::default(),
with_offset_alias: Default::default(),
with_ordinality: Default::default(),
}
}
}

impl Default for UnnestRelationBuilder {
fn default() -> Self {
Self::create_empty()
}
}

/// Runtime error when a `build()` method is called and one or more required fields
/// do not have a value.
#[derive(Debug, Clone)]
Expand Down
23 changes: 23 additions & 0 deletions datafusion/sql/src/unparser/dialect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,15 @@ pub trait Dialect: Send + Sync {
fn full_qualified_col(&self) -> bool {
false
}

/// Allow to unparse the unnest plan as [ast::TableFactor::UNNEST].
///
/// Some dialects like BigQuery require UNNEST to be used in the FROM clause but
/// the LogicalPlan planner always puts UNNEST in the SELECT clause. This flag allows
/// to unparse the UNNEST plan as [ast::TableFactor::UNNEST] instead of a subquery.
fn unnest_as_table_factor(&self) -> bool {
false
}
}

/// `IntervalStyle` to use for unparsing
Expand Down Expand Up @@ -448,6 +457,7 @@ pub struct CustomDialect {
requires_derived_table_alias: bool,
division_operator: BinaryOperator,
full_qualified_col: bool,
unnest_as_table_factor: bool,
}

impl Default for CustomDialect {
Expand All @@ -474,6 +484,7 @@ impl Default for CustomDialect {
requires_derived_table_alias: false,
division_operator: BinaryOperator::Divide,
full_qualified_col: false,
unnest_as_table_factor: false,
}
}
}
Expand Down Expand Up @@ -582,6 +593,10 @@ impl Dialect for CustomDialect {
fn full_qualified_col(&self) -> bool {
self.full_qualified_col
}

fn unnest_as_table_factor(&self) -> bool {
self.unnest_as_table_factor
}
}

/// `CustomDialectBuilder` to build `CustomDialect` using builder pattern
Expand Down Expand Up @@ -617,6 +632,7 @@ pub struct CustomDialectBuilder {
requires_derived_table_alias: bool,
division_operator: BinaryOperator,
full_qualified_col: bool,
unnest_as_table_factor: bool,
}

impl Default for CustomDialectBuilder {
Expand Down Expand Up @@ -649,6 +665,7 @@ impl CustomDialectBuilder {
requires_derived_table_alias: false,
division_operator: BinaryOperator::Divide,
full_qualified_col: false,
unnest_as_table_factor: false,
}
}

Expand All @@ -673,6 +690,7 @@ impl CustomDialectBuilder {
requires_derived_table_alias: self.requires_derived_table_alias,
division_operator: self.division_operator,
full_qualified_col: self.full_qualified_col,
unnest_as_table_factor: self.unnest_as_table_factor,
}
}

Expand Down Expand Up @@ -800,4 +818,9 @@ impl CustomDialectBuilder {
self.full_qualified_col = full_qualified_col;
self
}

pub fn with_unnest_as_table_factor(mut self, _unnest_as_table_factor: bool) -> Self {
self.unnest_as_table_factor = _unnest_as_table_factor;
self
}
}
55 changes: 53 additions & 2 deletions datafusion/sql/src/unparser/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,17 @@ use super::{
},
Unparser,
};
use crate::unparser::ast::UnnestRelationBuilder;
use crate::unparser::utils::unproject_agg_exprs;
use crate::utils::UNNEST_PLACEHOLDER;
use datafusion_common::{
internal_err, not_impl_err,
tree_node::{TransformedResult, TreeNode},
Column, DataFusionError, Result, TableReference,
};
use datafusion_expr::{
expr::Alias, BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan,
LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest,
};
use sqlparser::ast::{self, Ident, SetExpr};
use std::sync::Arc;
Expand Down Expand Up @@ -312,6 +314,19 @@ impl Unparser<'_> {
.select_to_sql_recursively(&new_plan, query, select, relation);
}

// Projection can be top-level plan for unnest relation
// The projection generated by the `RecursiveUnnestRewriter` from a UNNEST relation will have
// only one expression, which is the placeholder column generated by the rewriter.
if self.dialect.unnest_as_table_factor()
&& p.expr.len() == 1
&& Self::is_unnest_placeholder(&p.expr[0])
{
if let LogicalPlan::Unnest(unnest) = &p.input.as_ref() {
return self
.unnest_to_table_factor_sql(unnest, query, select, relation);
}
}

// Projection can be top-level plan for derived table
if select.already_projected() {
return self.derive_with_dialect_alias(
Expand Down Expand Up @@ -678,7 +693,11 @@ impl Unparser<'_> {
)
}
LogicalPlan::EmptyRelation(_) => {
relation.empty();
// An EmptyRelation could be behind an UNNEST node. If the dialect supports UNNEST as a table factor,
// a TableRelationBuilder will be created for the UNNEST node first.
if !relation.has_relation() {
relation.empty();
}
Ok(())
}
LogicalPlan::Extension(_) => not_impl_err!("Unsupported operator: {plan:?}"),
Expand Down Expand Up @@ -708,6 +727,38 @@ impl Unparser<'_> {
}
}

/// Try to find the placeholder column name generated by `RecursiveUnnestRewriter`
/// Only match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(...)"))`
fn is_unnest_placeholder(expr: &Expr) -> bool {
if let Expr::Alias(Alias { expr, .. }) = expr {
if let Expr::Column(Column { name, .. }) = expr.as_ref() {
return name.starts_with(UNNEST_PLACEHOLDER);
}
}
false
}

fn unnest_to_table_factor_sql(
&self,
unnest: &Unnest,
query: &mut Option<QueryBuilder>,
select: &mut SelectBuilder,
relation: &mut RelationBuilder,
) -> Result<()> {
let mut unnest_relation = UnnestRelationBuilder::default();
let LogicalPlan::Projection(p) = unnest.input.as_ref() else {
return internal_err!("Unnest input is not a Projection: {unnest:?}");
};
let exprs = p
.expr
.iter()
.map(|e| self.expr_to_sql(e))
.collect::<Result<Vec<_>>>()?;
unnest_relation.array_exprs(exprs);
relation.unnest(unnest_relation);
self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
}

fn is_scan_with_pushdown(scan: &TableScan) -> bool {
scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some()
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/unparser/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ pub(crate) fn find_window_nodes_within_select<'a>(

/// Recursively identify Column expressions and transform them into the appropriate unnest expression
///
/// For example, if expr contains the column expr "unnest_placeholder(make_array(Int64(1),Int64(2),Int64(2),Int64(5),NULL),depth=1)"
/// For example, if expr contains the column expr "__unnest_placeholder(make_array(Int64(1),Int64(2),Int64(2),Int64(5),NULL),depth=1)"
/// it will be transformed into an actual unnest expression UNNEST([1, 2, 2, 5, NULL])
pub(crate) fn unproject_unnest_expr(expr: Expr, unnest: &Unnest) -> Result<Expr> {
expr.transform(|sub_expr| {
Expand Down
Loading

0 comments on commit 2b65fb3

Please sign in to comment.