Skip to content

Commit

Permalink
Implement SHOW FUNCTIONS (apache#13799)
Browse files Browse the repository at this point in the history
* introduce rid for different signature

* implement show functions syntax

* add syntax example

* avoid duplicate join

* fix clippy

* show function_type instead of routine_type

* add some doc and comments
  • Loading branch information
goldmedal authored Dec 21, 2024
1 parent 7089c64 commit ade14e7
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 82 deletions.
67 changes: 41 additions & 26 deletions datafusion/core/src/catalog_common/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use arrow::{
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use arrow_array::builder::BooleanBuilder;
use arrow_array::builder::{BooleanBuilder, UInt8Builder};
use async_trait::async_trait;
use datafusion_common::error::Result;
use datafusion_common::DataFusionError;
Expand Down Expand Up @@ -247,6 +247,7 @@ impl InformationSchemaConfig {
return_type,
"SCALAR",
udf.documentation().map(|d| d.description.to_string()),
udf.documentation().map(|d| d.syntax_example.to_string()),
)
}
}
Expand All @@ -266,6 +267,7 @@ impl InformationSchemaConfig {
return_type,
"AGGREGATE",
udaf.documentation().map(|d| d.description.to_string()),
udaf.documentation().map(|d| d.syntax_example.to_string()),
)
}
}
Expand All @@ -285,6 +287,7 @@ impl InformationSchemaConfig {
return_type,
"WINDOW",
udwf.documentation().map(|d| d.description.to_string()),
udwf.documentation().map(|d| d.syntax_example.to_string()),
)
}
}
Expand All @@ -308,7 +311,8 @@ impl InformationSchemaConfig {
args: Option<&Vec<(String, String)>>,
arg_types: Vec<String>,
return_type: Option<String>,
is_variadic: bool| {
is_variadic: bool,
rid: u8| {
for (position, type_name) in arg_types.iter().enumerate() {
let param_name =
args.and_then(|a| a.get(position).map(|arg| arg.0.as_str()));
Expand All @@ -322,6 +326,7 @@ impl InformationSchemaConfig {
type_name,
None::<&str>,
is_variadic,
rid,
);
}
if let Some(return_type) = return_type {
Expand All @@ -335,48 +340,52 @@ impl InformationSchemaConfig {
return_type.as_str(),
None::<&str>,
false,
rid,
);
}
};

for (func_name, udf) in udfs {
let args = udf.documentation().and_then(|d| d.arguments.clone());
let combinations = get_udf_args_and_return_types(udf)?;
for (arg_types, return_type) in combinations {
for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
add_parameters(
func_name,
args.as_ref(),
arg_types,
return_type,
Self::is_variadic(udf.signature()),
rid as u8,
);
}
}

for (func_name, udaf) in udafs {
let args = udaf.documentation().and_then(|d| d.arguments.clone());
let combinations = get_udaf_args_and_return_types(udaf)?;
for (arg_types, return_type) in combinations {
for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
add_parameters(
func_name,
args.as_ref(),
arg_types,
return_type,
Self::is_variadic(udaf.signature()),
rid as u8,
);
}
}

for (func_name, udwf) in udwfs {
let args = udwf.documentation().and_then(|d| d.arguments.clone());
let combinations = get_udwf_args_and_return_types(udwf)?;
for (arg_types, return_type) in combinations {
for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
add_parameters(
func_name,
args.as_ref(),
arg_types,
return_type,
Self::is_variadic(udwf.signature()),
rid as u8,
);
}
}
Expand Down Expand Up @@ -1095,6 +1104,7 @@ impl InformationSchemaRoutines {
Field::new("data_type", DataType::Utf8, true),
Field::new("function_type", DataType::Utf8, true),
Field::new("description", DataType::Utf8, true),
Field::new("syntax_example", DataType::Utf8, true),
]));

Self { schema, config }
Expand All @@ -1114,6 +1124,7 @@ impl InformationSchemaRoutines {
data_type: StringBuilder::new(),
function_type: StringBuilder::new(),
description: StringBuilder::new(),
syntax_example: StringBuilder::new(),
}
}
}
Expand All @@ -1131,6 +1142,7 @@ struct InformationSchemaRoutinesBuilder {
data_type: StringBuilder,
function_type: StringBuilder,
description: StringBuilder,
syntax_example: StringBuilder,
}

impl InformationSchemaRoutinesBuilder {
Expand All @@ -1145,6 +1157,7 @@ impl InformationSchemaRoutinesBuilder {
data_type: Option<impl AsRef<str>>,
function_type: impl AsRef<str>,
description: Option<impl AsRef<str>>,
syntax_example: Option<impl AsRef<str>>,
) {
self.specific_catalog.append_value(catalog_name.as_ref());
self.specific_schema.append_value(schema_name.as_ref());
Expand All @@ -1157,6 +1170,7 @@ impl InformationSchemaRoutinesBuilder {
self.data_type.append_option(data_type.as_ref());
self.function_type.append_value(function_type.as_ref());
self.description.append_option(description);
self.syntax_example.append_option(syntax_example);
}

fn finish(&mut self) -> RecordBatch {
Expand All @@ -1174,6 +1188,7 @@ impl InformationSchemaRoutinesBuilder {
Arc::new(self.data_type.finish()),
Arc::new(self.function_type.finish()),
Arc::new(self.description.finish()),
Arc::new(self.syntax_example.finish()),
],
)
.unwrap()
Expand Down Expand Up @@ -1222,6 +1237,12 @@ impl InformationSchemaParameters {
Field::new("data_type", DataType::Utf8, false),
Field::new("parameter_default", DataType::Utf8, true),
Field::new("is_variadic", DataType::Boolean, false),
// `rid` (short for `routine id`) is used to differentiate parameters from different signatures
// (It serves as the group-by key when generating the `SHOW FUNCTIONS` query).
// For example, the following signatures have different `rid` values:
// - `datetrunc(Utf8, Timestamp(Microsecond, Some("+TZ"))) -> Timestamp(Microsecond, Some("+TZ"))`
// - `datetrunc(Utf8View, Timestamp(Nanosecond, None)) -> Timestamp(Nanosecond, None)`
Field::new("rid", DataType::UInt8, false),
]));

Self { schema, config }
Expand All @@ -1239,7 +1260,7 @@ impl InformationSchemaParameters {
data_type: StringBuilder::new(),
parameter_default: StringBuilder::new(),
is_variadic: BooleanBuilder::new(),
inserted: HashSet::new(),
rid: UInt8Builder::new(),
}
}
}
Expand All @@ -1255,8 +1276,7 @@ struct InformationSchemaParametersBuilder {
data_type: StringBuilder,
parameter_default: StringBuilder,
is_variadic: BooleanBuilder,
// use HashSet to avoid duplicate rows. The key is (specific_name, ordinal_position, parameter_mode, data_type)
inserted: HashSet<(String, u64, String, String)>,
rid: UInt8Builder,
}

impl InformationSchemaParametersBuilder {
Expand All @@ -1272,25 +1292,19 @@ impl InformationSchemaParametersBuilder {
data_type: impl AsRef<str>,
parameter_default: Option<impl AsRef<str>>,
is_variadic: bool,
rid: u8,
) {
let key = (
specific_name.as_ref().to_string(),
ordinal_position,
parameter_mode.as_ref().to_string(),
data_type.as_ref().to_string(),
);
if self.inserted.insert(key) {
self.specific_catalog
.append_value(specific_catalog.as_ref());
self.specific_schema.append_value(specific_schema.as_ref());
self.specific_name.append_value(specific_name.as_ref());
self.ordinal_position.append_value(ordinal_position);
self.parameter_mode.append_value(parameter_mode.as_ref());
self.parameter_name.append_option(parameter_name.as_ref());
self.data_type.append_value(data_type.as_ref());
self.parameter_default.append_option(parameter_default);
self.is_variadic.append_value(is_variadic);
}
self.specific_catalog
.append_value(specific_catalog.as_ref());
self.specific_schema.append_value(specific_schema.as_ref());
self.specific_name.append_value(specific_name.as_ref());
self.ordinal_position.append_value(ordinal_position);
self.parameter_mode.append_value(parameter_mode.as_ref());
self.parameter_name.append_option(parameter_name.as_ref());
self.data_type.append_value(data_type.as_ref());
self.parameter_default.append_option(parameter_default);
self.is_variadic.append_value(is_variadic);
self.rid.append_value(rid);
}

fn finish(&mut self) -> RecordBatch {
Expand All @@ -1306,6 +1320,7 @@ impl InformationSchemaParametersBuilder {
Arc::new(self.data_type.finish()),
Arc::new(self.parameter_default.finish()),
Arc::new(self.is_variadic.finish()),
Arc::new(self.rid.finish()),
],
)
.unwrap()
Expand Down
88 changes: 88 additions & 0 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,10 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
self.show_columns_to_plan(extended, full, table_name)
}

Statement::ShowFunctions { filter, .. } => {
self.show_functions_to_plan(filter)
}

Statement::Insert(Insert {
or,
into,
Expand Down Expand Up @@ -1980,6 +1984,90 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
}

/// Rewrite `SHOW FUNCTIONS` to another SQL query
/// The query is based on the `information_schema.routines` and `information_schema.parameters` tables
///
/// The output columns:
/// - function_name: The name of function
/// - return_type: The return type of the function
/// - parameters: The name of parameters (ordered by the ordinal position)
/// - parameter_types: The type of parameters (ordered by the ordinal position)
/// - description: The description of the function (the description defined in the document)
/// - syntax_example: The syntax_example of the function (the syntax_example defined in the document)
fn show_functions_to_plan(
&self,
filter: Option<ShowStatementFilter>,
) -> Result<LogicalPlan> {
let where_clause = if let Some(filter) = filter {
match filter {
ShowStatementFilter::Like(like) => {
format!("WHERE p.function_name like '{like}'")
}
_ => return plan_err!("Unsupported SHOW FUNCTIONS filter"),
}
} else {
"".to_string()
};

let query = format!(
r#"
SELECT DISTINCT
p.*,
r.function_type function_type,
r.description description,
r.syntax_example syntax_example
FROM
(
SELECT
i.specific_name function_name,
o.data_type return_type,
array_agg(i.parameter_name ORDER BY i.ordinal_position ASC) parameters,
array_agg(i.data_type ORDER BY i.ordinal_position ASC) parameter_types
FROM (
SELECT
specific_catalog,
specific_schema,
specific_name,
ordinal_position,
parameter_name,
data_type,
rid
FROM
information_schema.parameters
WHERE
parameter_mode = 'IN'
) i
JOIN
(
SELECT
specific_catalog,
specific_schema,
specific_name,
ordinal_position,
parameter_name,
data_type,
rid
FROM
information_schema.parameters
WHERE
parameter_mode = 'OUT'
) o
ON i.specific_catalog = o.specific_catalog
AND i.specific_schema = o.specific_schema
AND i.specific_name = o.specific_name
AND i.rid = o.rid
GROUP BY 1, 2, i.rid
) as p
JOIN information_schema.routines r
ON p.function_name = r.routine_name
{where_clause}
"#
);
let mut rewrite = DFParser::parse_sql(&query)?;
assert_eq!(rewrite.len(), 1);
self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
}

fn show_create_table_to_plan(
&self,
sql_table_name: ObjectName,
Expand Down
Loading

0 comments on commit ade14e7

Please sign in to comment.