Skip to content

Commit

Permalink
Move wildcard expansions to the analyzer (#11681)
Browse files Browse the repository at this point in the history
* allow qualified wildcard in the logical plan

* move wildcard expansions to the analyzer

* fix fmt

* fix the view tests

* expand wildcard for schema

* fix for union query

* cargo fmt clippy

* move wildcard expanding tests to expand_wildcard_rule.rs

* coercion the expanded wildcard expression in union

* remove debug message

* move wildcard options to logical plan

* remove unused function

* add the doc for expression function

* fix cargo check

* fix cargo fmt

* fix test

* extract expand_exprlist

* expand wildcard for functional_dependencies

* refine the doc

* fix tests

* fix expand exclude and except

* remove unused import

* fix check and update function

* fix check

* throw the error when exprlist to field

* fix functional_dependency and exclude

* fix projection_schema

* fix the window functions

* fix clippy and support unparsing wildcard

* fix clippy and fmt

* add the doc for util functions

* fix unique expression check for projection

* cargo fmt

* move test and solve dependency issue

* address review comments

* add the fail reason

* enhance the doc

* add more doc
  • Loading branch information
goldmedal authored Aug 13, 2024
1 parent e8ac93a commit 3438b35
Show file tree
Hide file tree
Showing 27 changed files with 1,057 additions and 254 deletions.
44 changes: 38 additions & 6 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@
use std::{any::Any, sync::Arc};

use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_common::Column;
use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown};

use crate::{
error::Result,
logical_expr::{Expr, LogicalPlan},
physical_plan::ExecutionPlan,
};
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_common::config::ConfigOptions;
use datafusion_common::Column;
use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown};
use datafusion_optimizer::analyzer::expand_wildcard_rule::ExpandWildcardRule;
use datafusion_optimizer::Analyzer;

use crate::datasource::{TableProvider, TableType};

Expand All @@ -50,6 +52,7 @@ impl ViewTable {
logical_plan: LogicalPlan,
definition: Option<String>,
) -> Result<Self> {
let logical_plan = Self::apply_required_rule(logical_plan)?;
let table_schema = logical_plan.schema().as_ref().to_owned().into();

let view = Self {
Expand All @@ -61,6 +64,15 @@ impl ViewTable {
Ok(view)
}

fn apply_required_rule(logical_plan: LogicalPlan) -> Result<LogicalPlan> {
let options = ConfigOptions::default();
Analyzer::with_rules(vec![Arc::new(ExpandWildcardRule::new())]).execute_and_check(
logical_plan,
&options,
|_, _| {},
)
}

/// Get definition ref
pub fn definition(&self) -> Option<&String> {
self.definition.as_ref()
Expand Down Expand Up @@ -232,6 +244,26 @@ mod tests {

assert_batches_eq!(expected, &results);

let view_sql =
"CREATE VIEW replace_xyz AS SELECT * REPLACE (column1*2 as column1) FROM xyz";
session_ctx.sql(view_sql).await?.collect().await?;

let results = session_ctx
.sql("SELECT * FROM replace_xyz")
.await?
.collect()
.await?;

let expected = [
"+---------+---------+---------+",
"| column1 | column2 | column3 |",
"+---------+---------+---------+",
"| 2 | 2 | 3 |",
"| 8 | 5 | 6 |",
"+---------+---------+---------+",
];

assert_batches_eq!(expected, &results);
Ok(())
}

Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,6 @@ impl SessionContext {
}
(_, Err(_)) => {
let table = Arc::new(ViewTable::try_new((*input).clone(), definition)?);

self.register_table(name, table)?;
self.return_empty_dataframe()
}
Expand Down
221 changes: 211 additions & 10 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ use datafusion_common::{
internal_err, not_impl_err, plan_err, Column, DFSchema, Result, ScalarValue,
TableReference,
};
use sqlparser::ast::NullTreatment;
use sqlparser::ast::{
display_comma_separated, ExceptSelectItem, ExcludeSelectItem, IlikeSelectItem,
NullTreatment, RenameSelectItem, ReplaceSelectElement,
};

/// Represents logical expressions such as `A + 1`, or `CAST(c1 AS int)`.
///
Expand Down Expand Up @@ -315,7 +318,10 @@ pub enum Expr {
///
/// This expr has to be resolved to a list of columns before translating logical
/// plan into physical plan.
Wildcard { qualifier: Option<TableReference> },
Wildcard {
qualifier: Option<TableReference>,
options: WildcardOptions,
},
/// List of grouping set expressions. Only valid in the context of an aggregate
/// GROUP BY expression list
GroupingSet(GroupingSet),
Expand Down Expand Up @@ -970,6 +976,89 @@ impl GroupingSet {
}
}

/// Additional options for wildcards, e.g. Snowflake `EXCLUDE`/`RENAME` and Bigquery `EXCEPT`.
#[derive(Clone, PartialEq, Eq, Hash, Debug, Default)]
pub struct WildcardOptions {
/// `[ILIKE...]`.
/// Snowflake syntax: <https://docs.snowflake.com/en/sql-reference/sql/select#parameters>
pub ilike: Option<IlikeSelectItem>,
/// `[EXCLUDE...]`.
/// Snowflake syntax: <https://docs.snowflake.com/en/sql-reference/sql/select#parameters>
pub exclude: Option<ExcludeSelectItem>,
/// `[EXCEPT...]`.
/// BigQuery syntax: <https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#select_except>
/// Clickhouse syntax: <https://clickhouse.com/docs/en/sql-reference/statements/select#except>
pub except: Option<ExceptSelectItem>,
/// `[REPLACE]`
/// BigQuery syntax: <https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#select_replace>
/// Clickhouse syntax: <https://clickhouse.com/docs/en/sql-reference/statements/select#replace>
/// Snowflake syntax: <https://docs.snowflake.com/en/sql-reference/sql/select#parameters>
pub replace: Option<PlannedReplaceSelectItem>,
/// `[RENAME ...]`.
/// Snowflake syntax: <https://docs.snowflake.com/en/sql-reference/sql/select#parameters>
pub rename: Option<RenameSelectItem>,
}

impl WildcardOptions {
pub fn with_replace(self, replace: PlannedReplaceSelectItem) -> Self {
WildcardOptions {
ilike: self.ilike,
exclude: self.exclude,
except: self.except,
replace: Some(replace),
rename: self.rename,
}
}
}

impl Display for WildcardOptions {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
if let Some(ilike) = &self.ilike {
write!(f, " {ilike}")?;
}
if let Some(exclude) = &self.exclude {
write!(f, " {exclude}")?;
}
if let Some(except) = &self.except {
write!(f, " {except}")?;
}
if let Some(replace) = &self.replace {
write!(f, " {replace}")?;
}
if let Some(rename) = &self.rename {
write!(f, " {rename}")?;
}
Ok(())
}
}

/// The planned expressions for `REPLACE`
#[derive(Clone, PartialEq, Eq, Hash, Debug, Default)]
pub struct PlannedReplaceSelectItem {
/// The original ast nodes
pub items: Vec<ReplaceSelectElement>,
/// The expression planned from the ast nodes. They will be used when expanding the wildcard.
pub planned_expressions: Vec<Expr>,
}

impl Display for PlannedReplaceSelectItem {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "REPLACE")?;
write!(f, " ({})", display_comma_separated(&self.items))?;
Ok(())
}
}

impl PlannedReplaceSelectItem {
pub fn items(&self) -> &[ReplaceSelectElement] {
&self.items
}

pub fn expressions(&self) -> &[Expr] {
&self.planned_expressions
}
}

/// Fixed seed for the hashing so that Ords are consistent across runs
const SEED: ahash::RandomState = ahash::RandomState::with_seeds(0, 0, 0, 0);

Expand Down Expand Up @@ -1720,8 +1809,9 @@ impl Expr {
Expr::ScalarSubquery(subquery) => {
subquery.hash(hasher);
}
Expr::Wildcard { qualifier } => {
Expr::Wildcard { qualifier, options } => {
qualifier.hash(hasher);
options.hash(hasher);
}
Expr::GroupingSet(grouping_set) => {
mem::discriminant(grouping_set).hash(hasher);
Expand Down Expand Up @@ -2242,9 +2332,9 @@ impl fmt::Display for Expr {
write!(f, "{expr} IN ([{}])", expr_vec_fmt!(list))
}
}
Expr::Wildcard { qualifier } => match qualifier {
Some(qualifier) => write!(f, "{qualifier}.*"),
None => write!(f, "*"),
Expr::Wildcard { qualifier, options } => match qualifier {
Some(qualifier) => write!(f, "{qualifier}.*{options}"),
None => write!(f, "*{options}"),
},
Expr::GroupingSet(grouping_sets) => match grouping_sets {
GroupingSet::Rollup(exprs) => {
Expand Down Expand Up @@ -2543,9 +2633,10 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
Expr::Sort { .. } => {
internal_err!("Create physical name does not support sort expression")
}
Expr::Wildcard { .. } => {
internal_err!("Create physical name does not support wildcard")
}
Expr::Wildcard { qualifier, options } => match qualifier {
Some(qualifier) => Ok(format!("{}.*{}", qualifier, options)),
None => Ok(format!("*{}", options)),
},
Expr::Placeholder(_) => {
internal_err!("Create physical name does not support placeholder")
}
Expand All @@ -2558,7 +2649,12 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
#[cfg(test)]
mod test {
use crate::expr_fn::col;
use crate::{case, lit, ColumnarValue, ScalarUDF, ScalarUDFImpl, Volatility};
use crate::{
case, lit, qualified_wildcard, wildcard, wildcard_with_options, ColumnarValue,
ScalarUDF, ScalarUDFImpl, Volatility,
};
use sqlparser::ast;
use sqlparser::ast::{Ident, IdentWithAlias};
use std::any::Any;

#[test]
Expand Down Expand Up @@ -2859,4 +2955,109 @@ mod test {
);
assert_eq!(find_df_window_func("not_exist"), None)
}

#[test]
fn test_display_wildcard() {
assert_eq!(format!("{}", wildcard()), "*");
assert_eq!(format!("{}", qualified_wildcard("t1")), "t1.*");
assert_eq!(
format!(
"{}",
wildcard_with_options(wildcard_options(
Some(IlikeSelectItem {
pattern: "c1".to_string()
}),
None,
None,
None,
None
))
),
"* ILIKE 'c1'"
);
assert_eq!(
format!(
"{}",
wildcard_with_options(wildcard_options(
None,
Some(ExcludeSelectItem::Multiple(vec![
Ident::from("c1"),
Ident::from("c2")
])),
None,
None,
None
))
),
"* EXCLUDE (c1, c2)"
);
assert_eq!(
format!(
"{}",
wildcard_with_options(wildcard_options(
None,
None,
Some(ExceptSelectItem {
first_element: Ident::from("c1"),
additional_elements: vec![Ident::from("c2")]
}),
None,
None
))
),
"* EXCEPT (c1, c2)"
);
assert_eq!(
format!(
"{}",
wildcard_with_options(wildcard_options(
None,
None,
None,
Some(PlannedReplaceSelectItem {
items: vec![ReplaceSelectElement {
expr: ast::Expr::Identifier(Ident::from("c1")),
column_name: Ident::from("a1"),
as_keyword: false
}],
planned_expressions: vec![]
}),
None
))
),
"* REPLACE (c1 a1)"
);
assert_eq!(
format!(
"{}",
wildcard_with_options(wildcard_options(
None,
None,
None,
None,
Some(RenameSelectItem::Multiple(vec![IdentWithAlias {
ident: Ident::from("c1"),
alias: Ident::from("a1")
}]))
))
),
"* RENAME (c1 AS a1)"
)
}

fn wildcard_options(
opt_ilike: Option<IlikeSelectItem>,
opt_exclude: Option<ExcludeSelectItem>,
opt_except: Option<ExceptSelectItem>,
opt_replace: Option<PlannedReplaceSelectItem>,
opt_rename: Option<RenameSelectItem>,
) -> WildcardOptions {
WildcardOptions {
ilike: opt_ilike,
exclude: opt_exclude,
except: opt_except,
replace: opt_replace,
rename: opt_rename,
}
}
}
Loading

0 comments on commit 3438b35

Please sign in to comment.