From ea01e56c3341dd4308a24e94091b86ee475ce224 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Tue, 5 Mar 2024 23:05:19 +0000 Subject: [PATCH] Add plugable handler for `CREATE FUNCTION` (#9333) * Add plugable function factory * cover `DROP FUNCTION` as well ... ... partially, as `SessionState` does not expose unregister_udf at the moment. * update documentation * fix doc test * Address PR comments (code organization) * Address PR comments (factory interface) * fix test after rebase * `remove`'s gone from the trait ... ... `DROP FUNCTION` will look for function name in all available registries (udf, udaf, udwf). `remove` may be necessary if UDaF and UDwF do not get `simplify` method from #9304. * Rename FunctionDefinition and export it ... FunctionDefinition already exists, DefinitionStatement makes more sense. * Update datafusion/expr/src/logical_plan/ddl.rs Co-authored-by: Andrew Lamb * Update datafusion/core/src/execution/context/mod.rs Co-authored-by: Andrew Lamb * Update datafusion/core/tests/user_defined/user_defined_scalar_functions.rs Co-authored-by: Andrew Lamb * Update datafusion/expr/src/logical_plan/ddl.rs Co-authored-by: Andrew Lamb * resolve part of follow up comments * Qualified functions are not supported anymore * update docs and todos * fix clippy * address additional comments * Add sqllogicteset for CREATE/DROP function * Add coverage for DROP FUNCTION IF EXISTS * fix multiline error * revert dialect back to generic in test ... ... as `create function` gets support in latest sqlparser. * fmt --------- Co-authored-by: Andrew Lamb --- datafusion/core/src/execution/context/mod.rs | 97 ++++++++++++- .../user_defined_scalar_functions.rs | 130 ++++++++++++++++- datafusion/expr/src/logical_plan/ddl.rs | 83 ++++++++++- datafusion/expr/src/logical_plan/mod.rs | 5 +- datafusion/expr/src/logical_plan/statement.rs | 3 +- datafusion/proto/src/logical_plan/mod.rs | 6 + datafusion/sql/src/statement.rs | 134 ++++++++++++++++-- .../test_files/create_function.slt | 60 ++++++++ 8 files changed, 498 insertions(+), 20 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/create_function.slt diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index f29c9137f976..e071c5c80e11 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -73,9 +73,10 @@ use crate::datasource::{ }; use crate::error::{DataFusionError, Result}; use crate::logical_expr::{ - CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, - CreateView, DropCatalogSchema, DropTable, DropView, Explain, LogicalPlan, - LogicalPlanBuilder, SetVariable, TableSource, TableType, UNNAMED_TABLE, + CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction, + CreateMemoryTable, CreateView, DropCatalogSchema, DropFunction, DropTable, DropView, + Explain, LogicalPlan, LogicalPlanBuilder, SetVariable, TableSource, TableType, + UNNAMED_TABLE, }; use crate::optimizer::OptimizerRule; use datafusion_sql::{ @@ -489,6 +490,8 @@ impl SessionContext { DdlStatement::DropTable(cmd) => self.drop_table(cmd).await, DdlStatement::DropView(cmd) => self.drop_view(cmd).await, DdlStatement::DropCatalogSchema(cmd) => self.drop_schema(cmd).await, + DdlStatement::CreateFunction(cmd) => self.create_function(cmd).await, + DdlStatement::DropFunction(cmd) => self.drop_function(cmd).await, }, // TODO what about the other statements (like TransactionStart and TransactionEnd) LogicalPlan::Statement(Statement::SetVariable(stmt)) => { @@ -794,6 +797,55 @@ impl SessionContext { Ok(false) } + async fn create_function(&self, stmt: CreateFunction) -> Result { + let function = { + let state = self.state.read().clone(); + let function_factory = &state.function_factory; + + match function_factory { + Some(f) => f.create(state.config(), stmt).await?, + _ => Err(DataFusionError::Configuration( + "Function factory has not been configured".into(), + ))?, + } + }; + + match function { + RegisterFunction::Scalar(f) => { + self.state.write().register_udf(f)?; + } + RegisterFunction::Aggregate(f) => { + self.state.write().register_udaf(f)?; + } + RegisterFunction::Window(f) => { + self.state.write().register_udwf(f)?; + } + RegisterFunction::Table(name, f) => self.register_udtf(&name, f), + }; + + self.return_empty_dataframe() + } + + async fn drop_function(&self, stmt: DropFunction) -> Result { + // we don't know function type at this point + // decision has been made to drop all functions + let mut dropped = false; + dropped |= self.state.write().deregister_udf(&stmt.name)?.is_some(); + dropped |= self.state.write().deregister_udaf(&stmt.name)?.is_some(); + dropped |= self.state.write().deregister_udwf(&stmt.name)?.is_some(); + + // DROP FUNCTION IF EXISTS drops the specified function only if that + // function exists and in this way, it avoids error. While the DROP FUNCTION + // statement also performs the same function, it throws an + // error if the function does not exist. + + if !stmt.if_exists && !dropped { + exec_err!("Function does not exist") + } else { + self.return_empty_dataframe() + } + } + /// Registers a variable provider within this context. pub fn register_variable( &self, @@ -1261,7 +1313,30 @@ impl QueryPlanner for DefaultQueryPlanner { .await } } +/// A pluggable interface to handle `CREATE FUNCTION` statements +/// and interact with [SessionState] to registers new udf, udaf or udwf. + +#[async_trait] +pub trait FunctionFactory: Sync + Send { + /// Handles creation of user defined function specified in [CreateFunction] statement + async fn create( + &self, + state: &SessionConfig, + statement: CreateFunction, + ) -> Result; +} +/// Type of function to create +pub enum RegisterFunction { + /// Scalar user defined function + Scalar(Arc), + /// Aggregate user defined function + Aggregate(Arc), + /// Window user defined function + Window(Arc), + /// Table user defined function + Table(String, Arc), +} /// Execution context for registering data sources and executing queries. /// See [`SessionContext`] for a higher level API. /// @@ -1306,6 +1381,12 @@ pub struct SessionState { table_factories: HashMap>, /// Runtime environment runtime_env: Arc, + + /// [FunctionFactory] to support pluggable user defined function handler. + /// + /// It will be invoked on `CREATE FUNCTION` statements. + /// thus, changing dialect o PostgreSql is required + function_factory: Option>, } impl Debug for SessionState { @@ -1392,6 +1473,7 @@ impl SessionState { execution_props: ExecutionProps::new(), runtime_env: runtime, table_factories, + function_factory: None, }; // register built in functions @@ -1568,6 +1650,15 @@ impl SessionState { self } + /// Registers a [`FunctionFactory`] to handle `CREATE FUNCTION` statements + pub fn with_function_factory( + mut self, + function_factory: Arc, + ) -> Self { + self.function_factory = Some(function_factory); + self + } + /// Replace the extension [`SerializerRegistry`] pub fn with_serializer_registry( mut self, diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index 982fb0464ed5..d9b60134b3d9 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -17,10 +17,12 @@ use arrow::compute::kernels::numeric::add; use arrow_array::{ - Array, ArrayRef, Float32Array, Float64Array, Int32Array, RecordBatch, UInt8Array, + Array, ArrayRef, ArrowNativeTypeOp, Float32Array, Float64Array, Int32Array, + RecordBatch, UInt8Array, }; use arrow_schema::DataType::Float64; use arrow_schema::{DataType, Field, Schema}; +use datafusion::execution::context::{FunctionFactory, RegisterFunction, SessionState}; use datafusion::prelude::*; use datafusion::{execution::registry::FunctionRegistry, test_util}; use datafusion_common::cast::as_float64_array; @@ -31,10 +33,12 @@ use datafusion_common::{ use datafusion_expr::simplify::ExprSimplifyResult; use datafusion_expr::simplify::SimplifyInfo; use datafusion_expr::{ - create_udaf, create_udf, Accumulator, ColumnarValue, ExprSchemable, + create_udaf, create_udf, Accumulator, ColumnarValue, CreateFunction, ExprSchemable, LogicalPlanBuilder, ScalarUDF, ScalarUDFImpl, Signature, Volatility, }; +use parking_lot::Mutex; +use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use rand::{thread_rng, Rng}; use std::any::Any; use std::iter; @@ -735,6 +739,128 @@ async fn verify_udf_return_type() -> Result<()> { Ok(()) } +#[derive(Debug, Default)] +struct MockFunctionFactory { + pub captured_expr: Mutex>, +} + +#[async_trait::async_trait] +impl FunctionFactory for MockFunctionFactory { + #[doc = r" Crates and registers a function from [CreateFunction] statement"] + #[must_use] + #[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)] + async fn create( + &self, + _config: &SessionConfig, + statement: CreateFunction, + ) -> datafusion::error::Result { + // In this example, we always create a function that adds its arguments + // with the name specified in `CREATE FUNCTION`. In a real implementation + // the body of the created UDF would also likely be a function of the contents + // of the `CreateFunction` + let mock_add = Arc::new(|args: &[datafusion_expr::ColumnarValue]| { + let args = datafusion_expr::ColumnarValue::values_to_arrays(args)?; + let base = + datafusion_common::cast::as_float64_array(&args[0]).expect("cast failed"); + let exponent = + datafusion_common::cast::as_float64_array(&args[1]).expect("cast failed"); + + let array = base + .iter() + .zip(exponent.iter()) + .map(|(base, exponent)| match (base, exponent) { + (Some(base), Some(exponent)) => Some(base.add_wrapping(exponent)), + _ => None, + }) + .collect::(); + Ok(datafusion_expr::ColumnarValue::from( + Arc::new(array) as arrow_array::ArrayRef + )) + }); + + let args = statement.args.unwrap(); + let mock_udf = create_udf( + &statement.name, + vec![args[0].data_type.clone(), args[1].data_type.clone()], + Arc::new(statement.return_type.unwrap()), + datafusion_expr::Volatility::Immutable, + mock_add, + ); + + // capture expression so we can verify + // it has been parsed + *self.captured_expr.lock() = statement.params.return_; + + Ok(RegisterFunction::Scalar(Arc::new(mock_udf))) + } +} + +#[tokio::test] +async fn create_scalar_function_from_sql_statement() -> Result<()> { + let function_factory = Arc::new(MockFunctionFactory::default()); + let runtime_config = RuntimeConfig::new(); + let runtime_environment = RuntimeEnv::new(runtime_config)?; + + let session_config = SessionConfig::new(); + let state = + SessionState::new_with_config_rt(session_config, Arc::new(runtime_environment)) + .with_function_factory(function_factory.clone()); + + let ctx = SessionContext::new_with_state(state); + let options = SQLOptions::new().with_allow_ddl(false); + + let sql = r#" + CREATE FUNCTION better_add(DOUBLE, DOUBLE) + RETURNS DOUBLE + RETURN $1 + $2 + "#; + + // try to `create function` when sql options have allow ddl disabled + assert!(ctx.sql_with_options(sql, options).await.is_err()); + + // Create the `better_add` function dynamically via CREATE FUNCTION statement + assert!(ctx.sql(sql).await.is_ok()); + // try to `drop function` when sql options have allow ddl disabled + assert!(ctx + .sql_with_options("drop function better_add", options) + .await + .is_err()); + + ctx.sql("select better_add(2.0, 2.0)").await?.show().await?; + + // check if we sql expr has been converted to datafusion expr + let captured_expression = function_factory.captured_expr.lock().clone().unwrap(); + assert_eq!("$1 + $2", captured_expression.to_string()); + + // statement drops function + assert!(ctx.sql("drop function better_add").await.is_ok()); + // no function, it panics + assert!(ctx.sql("drop function better_add").await.is_err()); + // no function, it dies not care + assert!(ctx.sql("drop function if exists better_add").await.is_ok()); + // query should fail as there is no function + assert!(ctx.sql("select better_add(2.0, 2.0)").await.is_err()); + + // tests expression parsing + // if expression is not correct + let bad_expression_sql = r#" + CREATE FUNCTION bad_expression_fun(DOUBLE, DOUBLE) + RETURNS DOUBLE + RETURN $1 $3 + "#; + assert!(ctx.sql(bad_expression_sql).await.is_err()); + + // tests bad function definition + let bad_definition_sql = r#" + CREATE FUNCTION bad_definition_fun(DOUBLE, DOUBLE) + RET BAD_TYPE + RETURN $1 + $3 + "#; + assert!(ctx.sql(bad_definition_sql).await.is_err()); + + Ok(()) +} + fn create_udf_context() -> SessionContext { let ctx = SessionContext::new(); // register a custom UDF diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs index e74992d99373..968c40c8bf62 100644 --- a/datafusion/expr/src/logical_plan/ddl.rs +++ b/datafusion/expr/src/logical_plan/ddl.rs @@ -22,12 +22,14 @@ use std::{ hash::{Hash, Hasher}, }; -use crate::{Expr, LogicalPlan}; +use crate::{Expr, LogicalPlan, Volatility}; +use arrow::datatypes::DataType; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ Constraints, DFSchemaRef, OwnedSchemaReference, OwnedTableReference, }; +use sqlparser::ast::Ident; /// Various types of DDL (CREATE / DROP) catalog manipulation #[derive(Clone, PartialEq, Eq, Hash)] @@ -48,6 +50,10 @@ pub enum DdlStatement { DropView(DropView), /// Drops a catalog schema DropCatalogSchema(DropCatalogSchema), + /// Create function statement + CreateFunction(CreateFunction), + /// Drop function statement + DropFunction(DropFunction), } impl DdlStatement { @@ -66,6 +72,8 @@ impl DdlStatement { DdlStatement::DropTable(DropTable { schema, .. }) => schema, DdlStatement::DropView(DropView { schema, .. }) => schema, DdlStatement::DropCatalogSchema(DropCatalogSchema { schema, .. }) => schema, + DdlStatement::CreateFunction(CreateFunction { schema, .. }) => schema, + DdlStatement::DropFunction(DropFunction { schema, .. }) => schema, } } @@ -81,6 +89,8 @@ impl DdlStatement { DdlStatement::DropTable(_) => "DropTable", DdlStatement::DropView(_) => "DropView", DdlStatement::DropCatalogSchema(_) => "DropCatalogSchema", + DdlStatement::CreateFunction(_) => "CreateFunction", + DdlStatement::DropFunction(_) => "DropFunction", } } @@ -97,6 +107,8 @@ impl DdlStatement { DdlStatement::DropTable(_) => vec![], DdlStatement::DropView(_) => vec![], DdlStatement::DropCatalogSchema(_) => vec![], + DdlStatement::CreateFunction(_) => vec![], + DdlStatement::DropFunction(_) => vec![], } } @@ -156,6 +168,12 @@ impl DdlStatement { }) => { write!(f, "DropCatalogSchema: {name:?} if not exist:={if_exists} cascade:={cascade}") } + DdlStatement::CreateFunction(CreateFunction { name, .. }) => { + write!(f, "CreateFunction: name {name:?}") + } + DdlStatement::DropFunction(DropFunction { name, .. }) => { + write!(f, "CreateFunction: name {name:?}") + } } } } @@ -303,3 +321,66 @@ pub struct DropCatalogSchema { /// Dummy schema pub schema: DFSchemaRef, } + +/// Arguments passed to `CREATE FUNCTION` +/// +/// Note this meant to be the same as from sqlparser's [`sqlparser::ast::Statement::CreateFunction`] +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub struct CreateFunction { + // TODO: There is open question should we expose sqlparser types or redefine them here? + // At the moment it make more sense to expose sqlparser types and leave + // user to convert them as needed + pub or_replace: bool, + pub temporary: bool, + pub name: String, + pub args: Option>, + pub return_type: Option, + pub params: CreateFunctionBody, + /// Dummy schema + pub schema: DFSchemaRef, +} +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub struct OperateFunctionArg { + // TODO: figure out how to support mode + // pub mode: Option, + pub name: Option, + pub data_type: DataType, + pub default_expr: Option, +} +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub struct CreateFunctionBody { + /// LANGUAGE lang_name + pub language: Option, + /// IMMUTABLE | STABLE | VOLATILE + pub behavior: Option, + /// AS 'definition' + pub as_: Option, + /// RETURN expression + pub return_: Option, +} + +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub enum DefinitionStatement { + SingleQuotedDef(String), + DoubleDollarDef(String), +} + +impl From for DefinitionStatement { + fn from(value: sqlparser::ast::FunctionDefinition) -> Self { + match value { + sqlparser::ast::FunctionDefinition::SingleQuotedDef(s) => { + Self::SingleQuotedDef(s) + } + sqlparser::ast::FunctionDefinition::DoubleDollarDef(s) => { + Self::DoubleDollarDef(s) + } + } + } +} + +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub struct DropFunction { + pub name: String, + pub if_exists: bool, + pub schema: DFSchemaRef, +} diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index f6e6000897a5..84781cb2e9ec 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -28,8 +28,9 @@ pub use builder::{ LogicalPlanBuilder, UNNAMED_TABLE, }; pub use ddl::{ - CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, - CreateView, DdlStatement, DropCatalogSchema, DropTable, DropView, + CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction, + CreateFunctionBody, CreateMemoryTable, CreateView, DdlStatement, DefinitionStatement, + DropCatalogSchema, DropFunction, DropTable, DropView, OperateFunctionArg, }; pub use dml::{DmlStatement, WriteOp}; pub use plan::{ diff --git a/datafusion/expr/src/logical_plan/statement.rs b/datafusion/expr/src/logical_plan/statement.rs index 0b2f1bd383a0..f294e7d3ea4c 100644 --- a/datafusion/expr/src/logical_plan/statement.rs +++ b/datafusion/expr/src/logical_plan/statement.rs @@ -15,9 +15,8 @@ // specific language governing permissions and limitations // under the License. -use std::fmt::{self, Display}; - use datafusion_common::DFSchemaRef; +use std::fmt::{self, Display}; /// Various types of Statements. /// diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 7c9ead27e3b5..7acad1844d48 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -1657,6 +1657,12 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(_)) => Err(proto_error( "LogicalPlan serde is not yet implemented for DropCatalogSchema", )), + LogicalPlan::Ddl(DdlStatement::CreateFunction(_)) => Err(proto_error( + "LogicalPlan serde is not yet implemented for CreateFunction", + )), + LogicalPlan::Ddl(DdlStatement::DropFunction(_)) => Err(proto_error( + "LogicalPlan serde is not yet implemented for DropFunction", + )), LogicalPlan::Statement(_) => Err(proto_error( "LogicalPlan serde is not yet implemented for Statement", )), diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index dfac8367e912..35063a6cfa06 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -31,10 +31,10 @@ use arrow_schema::DataType; use datafusion_common::file_options::StatementOptions; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ - not_impl_err, plan_datafusion_err, plan_err, schema_err, unqualified_field_not_found, - Column, Constraints, DFField, DFSchema, DFSchemaRef, DataFusionError, - OwnedTableReference, Result, ScalarValue, SchemaError, SchemaReference, - TableReference, ToDFSchema, + exec_err, not_impl_err, plan_datafusion_err, plan_err, schema_err, + unqualified_field_not_found, Column, Constraints, DFField, DFSchema, DFSchemaRef, + DataFusionError, OwnedTableReference, Result, ScalarValue, SchemaError, + SchemaReference, TableReference, ToDFSchema, }; use datafusion_expr::dml::{CopyOptions, CopyTo}; use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check; @@ -43,12 +43,13 @@ use datafusion_expr::logical_plan::DdlStatement; use datafusion_expr::utils::expr_to_columns; use datafusion_expr::{ cast, col, Analyze, CreateCatalog, CreateCatalogSchema, - CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable, CreateView, - DescribeTable, DmlStatement, DropCatalogSchema, DropTable, DropView, EmptyRelation, - Explain, ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder, PlanType, Prepare, - SetVariable, Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode, + CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody, + CreateMemoryTable, CreateView, DescribeTable, DmlStatement, DropCatalogSchema, + DropFunction, DropTable, DropView, EmptyRelation, Explain, ExprSchemable, Filter, + LogicalPlan, LogicalPlanBuilder, OperateFunctionArg, PlanType, Prepare, SetVariable, + Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode, TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart, - WriteOp, + Volatility, WriteOp, }; use sqlparser::ast; use sqlparser::ast::{ @@ -626,8 +627,121 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }); Ok(LogicalPlan::Statement(statement)) } + Statement::CreateFunction { + or_replace, + temporary, + name, + args, + return_type, + params, + } => { + let return_type = match return_type { + Some(t) => Some(self.convert_data_type(&t)?), + None => None, + }; + let mut planner_context = PlannerContext::new(); + let empty_schema = &DFSchema::empty(); + + let args = match args { + Some(function_args) => { + let function_args = function_args + .into_iter() + .map(|arg| { + let data_type = self.convert_data_type(&arg.data_type)?; + + let default_expr = match arg.default_expr { + Some(expr) => Some(self.sql_to_expr( + expr, + empty_schema, + &mut planner_context, + )?), + None => None, + }; + Ok(OperateFunctionArg { + name: arg.name, + default_expr, + data_type, + }) + }) + .collect::>>(); + Some(function_args?) + } + None => None, + }; + // at the moment functions can't be qualified `schema.name` + let name = match &name.0[..] { + [] => exec_err!("Function should have name")?, + [n] => n.value.clone(), + [..] => not_impl_err!("Qualified functions are not supported")?, + }; + // + // convert resulting expression to data fusion expression + // + let arg_types = args.as_ref().map(|arg| { + arg.iter().map(|t| t.data_type.clone()).collect::>() + }); + let mut planner_context = PlannerContext::new() + .with_prepare_param_data_types(arg_types.unwrap_or_default()); + + let result_expression = match params.return_ { + Some(r) => Some(self.sql_to_expr( + r, + &DFSchema::empty(), + &mut planner_context, + )?), + None => None, + }; + + let params = CreateFunctionBody { + language: params.language, + behavior: params.behavior.map(|b| match b { + ast::FunctionBehavior::Immutable => Volatility::Immutable, + ast::FunctionBehavior::Stable => Volatility::Stable, + ast::FunctionBehavior::Volatile => Volatility::Volatile, + }), + as_: params.as_.map(|m| m.into()), + return_: result_expression, + }; - _ => not_impl_err!("Unsupported SQL statement: {sql:?}"), + let statement = DdlStatement::CreateFunction(CreateFunction { + or_replace, + temporary, + name, + return_type, + args, + params, + schema: DFSchemaRef::new(DFSchema::empty()), + }); + + Ok(LogicalPlan::Ddl(statement)) + } + Statement::DropFunction { + if_exists, + func_desc, + .. + } => { + // according to postgresql documentation it can be only one function + // specified in drop statement + if let Some(desc) = func_desc.first() { + // at the moment functions can't be qualified `schema.name` + let name = match &desc.name.0[..] { + [] => exec_err!("Function should have name")?, + [n] => n.value.clone(), + [..] => not_impl_err!("Qualified functions are not supported")?, + }; + let statement = DdlStatement::DropFunction(DropFunction { + if_exists, + name, + schema: DFSchemaRef::new(DFSchema::empty()), + }); + Ok(LogicalPlan::Ddl(statement)) + } else { + exec_err!("Function name not provided") + } + } + _ => { + not_impl_err!("Unsupported SQL statement: {sql:?}") + } } } diff --git a/datafusion/sqllogictest/test_files/create_function.slt b/datafusion/sqllogictest/test_files/create_function.slt new file mode 100644 index 000000000000..baa40ac64afc --- /dev/null +++ b/datafusion/sqllogictest/test_files/create_function.slt @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +## SQL tests for CREATE / DROP FUNCTION +## +## Note that DataFusion provides a pluggable system for creating functions +## but has no built in support for doing so. + +# Use PostgresSQL dialect (until we upgrade to sqlparser 0.44, where CREATE FUNCTION) +# is supported in the Generic dialect (the default) +statement ok +set datafusion.sql_parser.dialect = 'Postgres'; + +# Create function will fail unless a user supplied function factory is supplied +statement error DataFusion error: Invalid or Unsupported Configuration: Function factory has not been configured +CREATE FUNCTION foo (DOUBLE) RETURNS DOUBLE RETURN $1 + $2; + +# multi-part identifiers are not supported +statement error DataFusion error: This feature is not implemented: Qualified functions are not supported +CREATE FUNCTION foo.bar (DOUBLE) RETURNS DOUBLE RETURN $1 + $2; + +statement error DataFusion error: This feature is not implemented: Qualified functions are not supported +DROP FUNCTION foo.bar; + +# Show it is possible to drop existing (UDF) functions +query I +select abs(-1); +---- +1 + +# drop the function +statement ok +DROP FUNCTION abs; + +# now the the query errors +query error Invalid function 'abs'. +select abs(-1); + +# Can't drop the function again +statement error DataFusion error: Execution error: Function does not exist +DROP FUNCTION abs; + +# But DROP IF EXISTS does not error +statement ok +DROP FUNCTION IF EXISTS abs;