Skip to content

Commit

Permalink
build(deps): upgrade sqlparser to 0.47.0 (apache#10392)
Browse files Browse the repository at this point in the history
* build(deps): upgrade sqlparser to 0.46.0

Signed-off-by: tison <[email protected]>

* function and cast fixups

* catchup refactors

Signed-off-by: tison <[email protected]>

* try migrate json expr

Signed-off-by: tison <[email protected]>

* Update for changes in sqlparser

* Update dependencies

* handle zero argument form

* fmt

* fixup more

Signed-off-by: tison <[email protected]>

* fixup more

Signed-off-by: tison <[email protected]>

* try use jmhain's branch

Signed-off-by: tison <[email protected]>

* fix compile FunctionArgumentClause exhausted

Signed-off-by: tison <[email protected]>

* fix compile set multi vars

Signed-off-by: tison <[email protected]>

* fix compile new string values

Signed-off-by: tison <[email protected]>

* fix compile set multi vars

Signed-off-by: tison <[email protected]>

* fix compile Subscript

Signed-off-by: tison <[email protected]>

* cargo fmt

Signed-off-by: tison <[email protected]>

* revert workaround on values

Signed-off-by: tison <[email protected]>

* Rework field access

* update lock

* fix doc

* try catchup new sqlparser version

Signed-off-by: tison <[email protected]>

* fixup timezone expr

Signed-off-by: tison <[email protected]>

* fixup params

Signed-off-by: tison <[email protected]>

* lock

Signed-off-by: tison <[email protected]>

* Update to sqlparser 0.47.0

* Update rust stack size on windows

* Revert "Update rust stack size on windows"

This reverts commit b5743d5.

* Add test + support for `$$` function definition

* Disable failing windows CI test

* fmt

* simplify test

* fmt

---------

Signed-off-by: tison <[email protected]>
Co-authored-by: Joey Hain <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
3 people authored Jun 6, 2024
1 parent 089b232 commit f054586
Show file tree
Hide file tree
Showing 20 changed files with 502 additions and 429 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ rand = "0.8"
regex = "1.8"
rstest = "0.21.0"
serde_json = "1"
sqlparser = { version = "0.45.0", features = ["visitor"] }
sqlparser = { version = "0.47", features = ["visitor"] }
tempfile = "3"
thiserror = "1.0.44"
tokio = { version = "1.36", features = ["macros", "rt", "sync"] }
Expand Down
4 changes: 2 additions & 2 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions datafusion-cli/tests/cli_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ fn init() {
let _ = env_logger::try_init();
}

// Disabled due to https://github.com/apache/datafusion/issues/10793
#[cfg(not(target_family = "windows"))]
#[rstest]
#[case::exec_from_commands(
["--command", "select 1", "--format", "json", "-q"],
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/function_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl TryFrom<CreateFunction> for ScalarFunctionWrapper {
name: definition.name,
expr: definition
.params
.return_
.function_body
.expect("Expression has to be defined!"),
return_type: definition
.return_type
Expand Down
112 changes: 98 additions & 14 deletions datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@ use datafusion_common::cast::{as_float64_array, as_int32_array};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{
assert_batches_eq, assert_batches_sorted_eq, assert_contains, exec_err, internal_err,
not_impl_err, plan_err, DataFusionError, ExprSchema, Result, ScalarValue,
not_impl_err, plan_err, DFSchema, DataFusionError, ExprSchema, Result, ScalarValue,
};
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
use datafusion_expr::{
Accumulator, ColumnarValue, CreateFunction, ExprSchemable, LogicalPlanBuilder,
ScalarUDF, ScalarUDFImpl, Signature, Volatility,
Accumulator, ColumnarValue, CreateFunction, CreateFunctionBody, ExprSchemable,
LogicalPlanBuilder, OperateFunctionArg, ScalarUDF, ScalarUDFImpl, Signature,
Volatility,
};
use datafusion_functions_array::range::range_udf;
use parking_lot::Mutex;
use sqlparser::ast::Ident;

/// test that casting happens on udfs.
/// c11 is f32, but `custom_sqrt` requires f64. Casting happens but the logical plan and
Expand Down Expand Up @@ -828,7 +831,7 @@ impl TryFrom<CreateFunction> for ScalarFunctionWrapper {
name: definition.name,
expr: definition
.params
.return_
.function_body
.expect("Expression has to be defined!"),
return_type: definition
.return_type
Expand All @@ -852,15 +855,7 @@ impl TryFrom<CreateFunction> for ScalarFunctionWrapper {
#[tokio::test]
async fn create_scalar_function_from_sql_statement() -> Result<()> {
let function_factory = Arc::new(CustomFunctionFactory::default());
let runtime_config = RuntimeConfig::new();
let runtime_environment = RuntimeEnv::new(runtime_config)?;

let session_config = SessionConfig::new();
let state =
SessionState::new_with_config_rt(session_config, Arc::new(runtime_environment))
.with_function_factory(function_factory.clone());

let ctx = SessionContext::new_with_state(state);
let ctx = SessionContext::new().with_function_factory(function_factory.clone());
let options = SQLOptions::new().with_allow_ddl(false);

let sql = r#"
Expand Down Expand Up @@ -926,6 +921,95 @@ async fn create_scalar_function_from_sql_statement() -> Result<()> {
Ok(())
}

/// Saves whatever is passed to it as a scalar function
#[derive(Debug, Default)]
struct RecordingFunctonFactory {
calls: Mutex<Vec<CreateFunction>>,
}

impl RecordingFunctonFactory {
fn new() -> Self {
Self::default()
}

/// return all the calls made to the factory
fn calls(&self) -> Vec<CreateFunction> {
self.calls.lock().clone()
}
}

#[async_trait::async_trait]
impl FunctionFactory for RecordingFunctonFactory {
async fn create(
&self,
_state: &SessionState,
statement: CreateFunction,
) -> Result<RegisterFunction> {
self.calls.lock().push(statement);

let udf = range_udf();
Ok(RegisterFunction::Scalar(udf))
}
}

#[tokio::test]
async fn create_scalar_function_from_sql_statement_postgres_syntax() -> Result<()> {
let function_factory = Arc::new(RecordingFunctonFactory::new());
let ctx = SessionContext::new().with_function_factory(function_factory.clone());

let sql = r#"
CREATE FUNCTION strlen(name TEXT)
RETURNS int LANGUAGE plrust AS
$$
Ok(Some(name.unwrap().len() as i32))
$$;
"#;

let body = "
Ok(Some(name.unwrap().len() as i32))
";

match ctx.sql(sql).await {
Ok(_) => {}
Err(e) => {
panic!("Error creating function: {}", e);
}
}

// verify that the call was passed through
let calls = function_factory.calls();
let schema = DFSchema::try_from(Schema::empty())?;
assert_eq!(calls.len(), 1);
let call = &calls[0];
let expected = CreateFunction {
or_replace: false,
temporary: false,
name: "strlen".into(),
args: Some(vec![OperateFunctionArg {
name: Some(Ident {
value: "name".into(),
quote_style: None,
}),
data_type: DataType::Utf8,
default_expr: None,
}]),
return_type: Some(DataType::Int32),
params: CreateFunctionBody {
language: Some(Ident {
value: "plrust".into(),
quote_style: None,
}),
behavior: None,
function_body: Some(lit(body)),
},
schema: Arc::new(schema),
};

assert_eq!(call, &expected);

Ok(())
}

fn create_udf_context() -> SessionContext {
let ctx = SessionContext::new();
// register a custom UDF
Expand Down
25 changes: 2 additions & 23 deletions datafusion/expr/src/logical_plan/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,29 +341,8 @@ pub struct CreateFunctionBody {
pub language: Option<Ident>,
/// IMMUTABLE | STABLE | VOLATILE
pub behavior: Option<Volatility>,
/// AS 'definition'
pub as_: Option<DefinitionStatement>,
/// RETURN expression
pub return_: Option<Expr>,
}

#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub enum DefinitionStatement {
SingleQuotedDef(String),
DoubleDollarDef(String),
}

impl From<sqlparser::ast::FunctionDefinition> for DefinitionStatement {
fn from(value: sqlparser::ast::FunctionDefinition) -> Self {
match value {
sqlparser::ast::FunctionDefinition::SingleQuotedDef(s) => {
Self::SingleQuotedDef(s)
}
sqlparser::ast::FunctionDefinition::DoubleDollarDef(s) => {
Self::DoubleDollarDef(s)
}
}
}
/// RETURN or AS function body
pub function_body: Option<Expr>,
}

#[derive(Clone, PartialEq, Eq, Hash, Debug)]
Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ pub use builder::{
};
pub use ddl::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction,
CreateFunctionBody, CreateMemoryTable, CreateView, DdlStatement, DefinitionStatement,
DropCatalogSchema, DropFunction, DropTable, DropView, OperateFunctionArg,
CreateFunctionBody, CreateMemoryTable, CreateView, DdlStatement, DropCatalogSchema,
DropFunction, DropTable, DropView, OperateFunctionArg,
};
pub use dml::{DmlStatement, WriteOp};
pub use plan::{
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sql/src/expr/binary_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
BinaryOperator::PGBitwiseShiftRight => Ok(Operator::BitwiseShiftRight),
BinaryOperator::PGBitwiseShiftLeft => Ok(Operator::BitwiseShiftLeft),
BinaryOperator::StringConcat => Ok(Operator::StringConcat),
BinaryOperator::ArrowAt => Ok(Operator::ArrowAt),
BinaryOperator::AtArrow => Ok(Operator::AtArrow),
_ => not_impl_err!("Unsupported SQL binary operator {op:?}"),
}
}
Expand Down
128 changes: 122 additions & 6 deletions datafusion/sql/src/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use datafusion_expr::{
BuiltInWindowFunction,
};
use sqlparser::ast::{
Expr as SQLExpr, Function as SQLFunction, FunctionArg, FunctionArgExpr, WindowType,
DuplicateTreatment, Expr as SQLExpr, Function as SQLFunction, FunctionArg,
FunctionArgExpr, FunctionArgumentClause, FunctionArgumentList, FunctionArguments,
NullTreatment, ObjectName, OrderByExpr, WindowType,
};
use std::str::FromStr;
use strum::IntoEnumIterator;
Expand Down Expand Up @@ -79,23 +81,137 @@ fn find_closest_match(candidates: Vec<String>, target: &str) -> String {
.expect("No candidates provided.") // Panic if `candidates` argument is empty
}

/// Arguments to for a function call extracted from the SQL AST
#[derive(Debug)]
struct FunctionArgs {
/// Function name
name: ObjectName,
/// Argument expressions
args: Vec<FunctionArg>,
/// ORDER BY clause, if any
order_by: Vec<OrderByExpr>,
/// OVER clause, if any
over: Option<WindowType>,
/// FILTER clause, if any
filter: Option<Box<SQLExpr>>,
/// NULL treatment clause, if any
null_treatment: Option<NullTreatment>,
/// DISTINCT
distinct: bool,
}

impl FunctionArgs {
fn try_new(function: SQLFunction) -> Result<Self> {
let SQLFunction {
name,
args,
over,
filter,
mut null_treatment,
within_group,
} = function;

// Handle no argument form (aka `current_time` as opposed to `current_time()`)
let FunctionArguments::List(args) = args else {
return Ok(Self {
name,
args: vec![],
order_by: vec![],
over,
filter,
null_treatment,
distinct: false,
});
};

let FunctionArgumentList {
duplicate_treatment,
args,
clauses,
} = args;

let distinct = match duplicate_treatment {
Some(DuplicateTreatment::Distinct) => true,
Some(DuplicateTreatment::All) => false,
None => false,
};

// Pull out argument handling
let mut order_by = None;
for clause in clauses {
match clause {
FunctionArgumentClause::IgnoreOrRespectNulls(nt) => {
if null_treatment.is_some() {
return not_impl_err!(
"Calling {name}: Duplicated null treatment clause"
);
}
null_treatment = Some(nt);
}
FunctionArgumentClause::OrderBy(oby) => {
if order_by.is_some() {
return not_impl_err!("Calling {name}: Duplicated ORDER BY clause in function arguments");
}
order_by = Some(oby);
}
FunctionArgumentClause::Limit(limit) => {
return not_impl_err!(
"Calling {name}: LIMIT not supported in function arguments: {limit}"
)
}
FunctionArgumentClause::OnOverflow(overflow) => {
return not_impl_err!(
"Calling {name}: ON OVERFLOW not supported in function arguments: {overflow}"
)
}
FunctionArgumentClause::Having(having) => {
return not_impl_err!(
"Calling {name}: HAVING not supported in function arguments: {having}"
)
}
FunctionArgumentClause::Separator(sep) => {
return not_impl_err!(
"Calling {name}: SEPARATOR not supported in function arguments: {sep}"
)
}
}
}

if !within_group.is_empty() {
return not_impl_err!("WITHIN GROUP is not supported yet: {within_group:?}");
}

let order_by = order_by.unwrap_or_default();

Ok(Self {
name,
args,
order_by,
over,
filter,
null_treatment,
distinct,
})
}
}

impl<'a, S: ContextProvider> SqlToRel<'a, S> {
pub(super) fn sql_function_to_expr(
&self,
function: SQLFunction,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let SQLFunction {
let function_args = FunctionArgs::try_new(function)?;
let FunctionArgs {
name,
args,
order_by,
over,
distinct,
filter,
null_treatment,
special: _, // true if not called with trailing parens
order_by,
} = function;
distinct,
} = function_args;

// If function is a window function (it has an OVER clause),
// it shouldn't have ordering requirement as function argument
Expand Down
Loading

0 comments on commit f054586

Please sign in to comment.