From ac44a59a82ec3453639bae3de499ee01b7a5a8bc Mon Sep 17 00:00:00 2001 From: Trent Hauck Date: Mon, 21 Aug 2023 16:35:23 -0700 Subject: [PATCH 1/7] docs: fill out expr page --- docs/source/library-user-guide/adding-udfs.md | 105 +++++++++- .../library-user-guide/working-with-exprs.md | 198 +++++++++++++++++- 2 files changed, 301 insertions(+), 2 deletions(-) diff --git a/docs/source/library-user-guide/adding-udfs.md b/docs/source/library-user-guide/adding-udfs.md index 45d5afc1f07a..7a9e27a4eb70 100644 --- a/docs/source/library-user-guide/adding-udfs.md +++ b/docs/source/library-user-guide/adding-udfs.md @@ -19,4 +19,107 @@ # Adding User Defined Functions: Scalar/Window/Aggregate -Coming Soon +User Defined Functions (UDFs) are functions that can be used in the context of DataFusion execution. + +This page covers how to add UDFs to DataFusion. In particular, it covers how to add Scalar, Window, and Aggregate UDFs. + +| UDF Type | Description | +| --------- | ---------------------------------------------------------------------------------------------------------- | +| Scalar | A function that takes a row of data and returns a single value. | +| Window | A function that takes a row of data and returns a single value, but also has access to the rows around it. | +| Aggregate | A function that takes a group of rows and returns a single value. | + +First we'll talk about adding an Scalar UDF end-to-end, then we'll talk about the differences between the different types of UDFs. + +## Adding a Scalar UDF + +A Scalar UDF is a function that takes a row of data and returns a single value. For example, this function takes a single i64 and returns a single i64 with 1 added to it: + +```rust +use std::sync::Arc; + +use arrow::array::{ArrayRef, Int64Array}; +use datafusion::common::Result; + +use datafusion::common::cast::as_int32_array; + +pub fn add_one(args: &[ArrayRef]) -> Result { + let i64s = as_int32_array(&args[0])?; + + let array = i64s + .iter() + .map(|sequence| match sequence { + Some(value) => Some(value + 1), + None => None, + }) + .collect::(); + + Ok(Arc::new(array)) +} +``` + +For brevity, we'll skipped some error handling, but e.g. you may want to check that `args.len()` is the expected number of arguments. + +This "works" in isolation, i.e. if you have a slice of `ArrayRef`s, you can call `add_one` and it will return a new `ArrayRef` with 1 added to each value. + +```rust +let input = vec![Some(1), None, Some(3)]; +let input = Arc::new(Int64Array::from(input)) as ArrayRef; + +let result = add_one(&[input]).unwrap(); +let result = result.as_any().downcast_ref::().unwrap(); + +assert_eq!(result, &Int64Array::from(vec![Some(2), None, Some(4)])); +``` + +The challenge however is that DataFusion doesn't know about this function. We need to register it with DataFusion so that it can be used in the context of a query. + +### Registering a Scalar UDF + +To register a Scalar UDF, you need to wrap the function implementation in a `ScalarUDF` struct and then register it with the `ExecutionContext`. DataFusion provides the `create_udf` and `make_scalar_function` helper functions to make this easier. + +```rust +let udf = create_udf( + "add_one", + vec![DataType::Int64], + Arc::new(DataType::Int64), + Volatility::Immutable, + make_scalar_function(add_one), +); +``` + +A few things to note: + +- The first argument is the name of the function. This is the name that will be used in SQL queries. +- The second argument is a vector of `DataType`s. This is the list of argument types that the function accepts. I.e. in this case, the function accepts a single `Int64` argument. +- The third argument is the return type of the function. I.e. in this case, the function returns an `Int64`. +- The fourth argument is the volatility of the function. This is an enum with three options: `Immutable`, `Stable`, and `Volatile`. This is used to determine if the function can be cached in some situations. In this case, the function is `Immutable` because it always returns the same value for the same input. A random number generator would be `Volatile` because it returns a different value for the same input. +- The fifth argument is the function implementation. This is the function that we defined above. + +That gives us a `ScalarUDF` that we can register with the `ExecutionContext`: + +```rust +let mut ctx = ExecutionContext::new(); + +ctx.register_udf(udf); +``` + +At this point, the following could is expected to work: + +```rust +let sql = "SELECT add_one(1)"; + +let df = ctx.sql(&sql).await.unwrap(); +``` + +## Adding a Window UDF + +Scalar UDFs are functions that take a row of data and return a single value. Window UDFs are similar, but they also have access to the rows around them. Access to the the proximal rows is helpful, but adds some complexity to the implementation. + +Body coming soon. + +## Adding an Aggregate UDF + +Aggregate UDFs are functions that take a group of rows and return a single value. These are akin to SQL's `SUM` or `COUNT` functions. + +Body coming soon. diff --git a/docs/source/library-user-guide/working-with-exprs.md b/docs/source/library-user-guide/working-with-exprs.md index b1a26cdfcb51..b8eadd231269 100644 --- a/docs/source/library-user-guide/working-with-exprs.md +++ b/docs/source/library-user-guide/working-with-exprs.md @@ -19,4 +19,200 @@ # Working with Exprs -Coming Soon + + +`Expr` is short for "expression". It is a core abstraction in DataFusion for representing a computation. + +For example, the SQL expression `a + b` would be represented as an `Expr` with a `BinaryExpr` variant. A `BinaryExpr` has a left and right `Expr` and an operator. + +As another example, the SQL expression `a + b * c` would be represented as an `Expr` with a `BinaryExpr` variant. The left `Expr` would be `a` and the right `Expr` would be another `BinaryExpr` with a left `Expr` of `b` and a right `Expr` of `c`. + +As the writer of a library, you may want to use or create `Expr`s to represent computations that you want to perform. This guide will walk you through how to make your own scalar UDF as an `Expr` and how to rewrite `Expr`s to inline the simple UDF. + +## A Scalar UDF Example + +Let's start by creating our own `Expr` in the form of a Scalar UDF. This UDF will simply add 1 to the input value. + +```rust +pub fn add_one(args: &[ArrayRef]) -> Result { + let i32s = as_int64_array(&args[0])?; + + let array = i32s + .iter() + .map(|sequence| match sequence { + Some(value) => Some(value + 1), + None => None, + }) + .collect::(); + + Ok(Arc::new(array)) +} +``` + +And our `ScalarUDF` would look like this. Please see the section on [adding UDFs](./adding-udfs.md) for more information on how to create a `ScalarUDF`. + +```rust +let add_one = create_udf( + "add_one", + vec![DataType::Int64], + Arc::new(DataType::Int64), + Volatility::Immutable, + make_scalar_function(add_one), +); +``` + +## Creating Exprs Programmatically + +In addition to SQL strings, you can create `Expr`s programatically. This is common if you're working with a DataFrame vs. a SQL string. A simple example is: + +```rust +use datafusion::prelude::*; + +let expr = lit(5) + lit(5); +``` + +This is obviously a very simple example, but it shows how you can create an `Expr` from a literal value and sets us up later for how to simplify `Expr`s. You can also create `Expr`s from column references and operators: + +```rust +use datafusion::prelude::*; + +let expr = col("a") + col("b"); +``` + +In fact, the `add_one` we created earlier is also an `Expr`. And because it's so simple, we'll use it as fodder for how to rewrite `Expr`s. + +## Rewriting Exprs + +Rewriting Expressions is the process of taking an `Expr` and transforming it into another `Expr`. This is useful for a number of reasons, including: + +- Simplifying `Expr`s to make them easier to evaluate +- Optimizing `Expr`s to make them faster to evaluate +- Converting `Expr`s to other forms, e.g. converting a `BinaryExpr` to a `CastExpr` + +In our example, we'll use rewriting to update our `add_one` UDF, to be rewritten as a `BinaryExpr` with a `Literal` of 1. We're effectively inlining the UDF. + +### Rewriting with `transform` + +To implement the inlining, we'll need to write a function that takes an `Expr` and returns a `Result`. If the expression is _not_ to be rewritten `Transformed::No` is used to wrap the original `Expr`. If the expression _is_ to be rewritten, `Transformed::Yes` is used to wrap the new `Expr`. + +```rust +fn rewrite_add_one(expr: Expr) -> Result { + expr.transform(&|expr| { + Ok(match expr { + Expr::ScalarUDF(scalar_fun) => { + // rewrite the expression if the function is "add_one", otherwise return the original expression + if scalar_fun.fun.name == "add_one" { + let input_arg = scalar_fun.args[0].clone(); + + let new_expression = BinaryExpr::new( + Box::new(input_arg), + datafusion::logical_expr::Operator::Plus, + Box::new(Expr::Literal(datafusion::scalar::ScalarValue::Int64(Some( + 1, + )))), + ); + + Transformed::Yes(Expr::BinaryExpr(new_expression)) + } else { + // a scalar function that is not "add_one" is not rewritten + Transformed::No(Expr::ScalarUDF(scalar_fun)) + } + } + _ => Transformed::No(expr), // not a scalar function, so not rewritten + }) + }) +} +``` + +### Creating an `OptimizerRule` + +In DataFusion, an `OptimizerRule` is a trait that supports rewriting `Expr`s. It follows DataFusion's general mantra of trait implementations to drive behavior. + +We'll call our rule `AddOneInliner` and implement the `OptimizerRule` trait. The `OptimizerRule` trait has two methods: + +- `name` - returns the name of the rule +- `try_optimize` - takes a `LogicalPlan` and returns an `Option`. If the rule is able to optimize the plan, it returns `Some(LogicalPlan)` with the optimized plan. If the rule is not able to optimize the plan, it returns `None`. + +```rust +struct AddOneInliner {} + +impl OptimizerRule for AddOneInliner { + fn name(&self) -> &str { + "add_one_inliner" + } + + fn try_optimize( + &self, + plan: &LogicalPlan, + config: &dyn OptimizerConfig, + ) -> Result> { + // recurse down and optimize children first + let optimized_plan = utils::optimize_children(self, plan, config)?; + + match optimized_plan { + Some(LogicalPlan::Projection(projection)) => { + let proj_expression = projection + .expr + .iter() + .map(|expr| rewrite_add_one(expr.clone())) + .collect::>>()?; + + let proj = Projection::try_new(proj_expression, projection.input)?; + + Ok(Some(LogicalPlan::Projection(proj))) + } + Some(optimized_plan) => Ok(Some(optimized_plan)), + None => match plan { + LogicalPlan::Projection(projection) => { + let proj_expression = projection + .expr + .iter() + .map(|expr| rewrite_add_one(expr.clone())) + .collect::>>()?; + + let proj = Projection::try_new(proj_expression, projection.input.clone())?; + + Ok(Some(LogicalPlan::Projection(proj))) + } + _ => Ok(None), + }, + } + } +} +``` + +Note the use of `rewrite_add_one` which is mapped over the `expr` of the `Projection`. This is the function we wrote earlier that takes an `Expr` and returns a `Result`. + +We're almost there. Let's just test our rule works properly. + +## Testing the Rule + +Testing the rule is fairly simple, we can create a SessionState with our rule and then create a DataFrame and run a query. The logical plan will be optimized by our rule. + +```rust +use datafusion::prelude::*; + +let rules = Arc::new(AddOneInliner {}); +let state = ctx.state().with_optimizer_rules(vec![rules]); + +let ctx = SessionContext::with_state(state); +ctx.register_udf(add_one); + +let sql = "SELECT add_one(1) AS added_one"; +let plan = ctx.sql(sql).await?.logical_plan(); + +println!("{:?}", plan); +``` + +This results in the following output: + +```text +Projection: Int64(1) + Int64(1) AS added_one + EmptyRelation +``` + +I.e. the `add_one` UDF has been inlined into the projection. + +## Conclusion + +In this guide, we've seen how to create `Expr`s programmatically and how to rewrite them. This is useful for simplifying and optimizing `Expr`s. We've also seen how to test our rule to ensure it works properly. From 242470c30dd9b9b4a6e52f5dcb97e7ca6e76170c Mon Sep 17 00:00:00 2001 From: Trent Hauck Date: Tue, 22 Aug 2023 09:15:03 -0700 Subject: [PATCH 2/7] fix: fixup a few issues --- docs/source/library-user-guide/adding-udfs.md | 6 +++--- docs/source/library-user-guide/working-with-exprs.md | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/source/library-user-guide/adding-udfs.md b/docs/source/library-user-guide/adding-udfs.md index 7a9e27a4eb70..5344705543c5 100644 --- a/docs/source/library-user-guide/adding-udfs.md +++ b/docs/source/library-user-guide/adding-udfs.md @@ -76,7 +76,7 @@ The challenge however is that DataFusion doesn't know about this function. We ne ### Registering a Scalar UDF -To register a Scalar UDF, you need to wrap the function implementation in a `ScalarUDF` struct and then register it with the `ExecutionContext`. DataFusion provides the `create_udf` and `make_scalar_function` helper functions to make this easier. +To register a Scalar UDF, you need to wrap the function implementation in a `ScalarUDF` struct and then register it with the `SessionContext`. DataFusion provides the `create_udf` and `make_scalar_function` helper functions to make this easier. ```rust let udf = create_udf( @@ -96,10 +96,10 @@ A few things to note: - The fourth argument is the volatility of the function. This is an enum with three options: `Immutable`, `Stable`, and `Volatile`. This is used to determine if the function can be cached in some situations. In this case, the function is `Immutable` because it always returns the same value for the same input. A random number generator would be `Volatile` because it returns a different value for the same input. - The fifth argument is the function implementation. This is the function that we defined above. -That gives us a `ScalarUDF` that we can register with the `ExecutionContext`: +That gives us a `ScalarUDF` that we can register with the `SessionContext`: ```rust -let mut ctx = ExecutionContext::new(); +let mut ctx = SessionContext::new(); ctx.register_udf(udf); ``` diff --git a/docs/source/library-user-guide/working-with-exprs.md b/docs/source/library-user-guide/working-with-exprs.md index b8eadd231269..fc86052a05d5 100644 --- a/docs/source/library-user-guide/working-with-exprs.md +++ b/docs/source/library-user-guide/working-with-exprs.md @@ -39,7 +39,7 @@ pub fn add_one(args: &[ArrayRef]) -> Result { let array = i32s .iter() - .map(|sequence| match sequence { + .map(|item| match item { Some(value) => Some(value + 1), None => None, }) @@ -52,7 +52,7 @@ pub fn add_one(args: &[ArrayRef]) -> Result { And our `ScalarUDF` would look like this. Please see the section on [adding UDFs](./adding-udfs.md) for more information on how to create a `ScalarUDF`. ```rust -let add_one = create_udf( +let add_one_udf = create_udf( "add_one", vec![DataType::Int64], Arc::new(DataType::Int64), @@ -79,7 +79,7 @@ use datafusion::prelude::*; let expr = col("a") + col("b"); ``` -In fact, the `add_one` we created earlier is also an `Expr`. And because it's so simple, we'll use it as fodder for how to rewrite `Expr`s. +In fact, the `add_one_udf` we created earlier is also an `Expr`. And because it's so simple, we'll use it as fodder for how to rewrite `Expr`s. ## Rewriting Exprs From d70c2e2bbcc3ec1ef7f9deaeec4183ed709b8b36 Mon Sep 17 00:00:00 2001 From: Trent Hauck Date: Tue, 22 Aug 2023 09:20:06 -0700 Subject: [PATCH 3/7] docs: links to examples --- docs/source/library-user-guide/adding-udfs.md | 10 +++++----- docs/source/library-user-guide/working-with-exprs.md | 5 +++++ 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/docs/source/library-user-guide/adding-udfs.md b/docs/source/library-user-guide/adding-udfs.md index 5344705543c5..b57d9089bc69 100644 --- a/docs/source/library-user-guide/adding-udfs.md +++ b/docs/source/library-user-guide/adding-udfs.md @@ -23,11 +23,11 @@ User Defined Functions (UDFs) are functions that can be used in the context of D This page covers how to add UDFs to DataFusion. In particular, it covers how to add Scalar, Window, and Aggregate UDFs. -| UDF Type | Description | -| --------- | ---------------------------------------------------------------------------------------------------------- | -| Scalar | A function that takes a row of data and returns a single value. | -| Window | A function that takes a row of data and returns a single value, but also has access to the rows around it. | -| Aggregate | A function that takes a group of rows and returns a single value. | +| UDF Type | Description | Example | +| --------- | ---------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------- | +| Scalar | A function that takes a row of data and returns a single value. | [simple_udf.rs](../../../datafusion-examples/examples/simple_udf.rs) | +| Window | A function that takes a row of data and returns a single value, but also has access to the rows around it. | [simple_udwf.rs](../../../datafusion-examples/examples/simple_udwf.rs) | +| Aggregate | A function that takes a group of rows and returns a single value. | [simple_udaf.rs](../../../datafusion-examples/examples/simple_udaf.rs) | First we'll talk about adding an Scalar UDF end-to-end, then we'll talk about the differences between the different types of UDFs. diff --git a/docs/source/library-user-guide/working-with-exprs.md b/docs/source/library-user-guide/working-with-exprs.md index fc86052a05d5..6508508ee2ef 100644 --- a/docs/source/library-user-guide/working-with-exprs.md +++ b/docs/source/library-user-guide/working-with-exprs.md @@ -29,6 +29,11 @@ As another example, the SQL expression `a + b * c` would be represented as an `E As the writer of a library, you may want to use or create `Expr`s to represent computations that you want to perform. This guide will walk you through how to make your own scalar UDF as an `Expr` and how to rewrite `Expr`s to inline the simple UDF. +There are also executable examples for working with `Expr`s: + +- [rewrite_expr.rs](../../../datafusion-examples/examples/catalog.rs) +- [expr_api.rs](../../../datafusion-examples/examples/expr_api.rs) + ## A Scalar UDF Example Let's start by creating our own `Expr` in the form of a Scalar UDF. This UDF will simply add 1 to the input value. From 1fa302814f9b07942668da1762793c6068b35ad8 Mon Sep 17 00:00:00 2001 From: Trent Hauck Date: Thu, 24 Aug 2023 06:13:16 -0700 Subject: [PATCH 4/7] fix: impv variable name --- docs/source/library-user-guide/working-with-exprs.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/source/library-user-guide/working-with-exprs.md b/docs/source/library-user-guide/working-with-exprs.md index 6508508ee2ef..2ee42bd24c41 100644 --- a/docs/source/library-user-guide/working-with-exprs.md +++ b/docs/source/library-user-guide/working-with-exprs.md @@ -40,9 +40,9 @@ Let's start by creating our own `Expr` in the form of a Scalar UDF. This UDF wil ```rust pub fn add_one(args: &[ArrayRef]) -> Result { - let i32s = as_int64_array(&args[0])?; + let i64s = as_int64_array(&args[0])?; - let array = i32s + let array = i64s .iter() .map(|item| match item { Some(value) => Some(value + 1), From d716c8296a6da8576b843e99a4ab6b3b7c404404 Mon Sep 17 00:00:00 2001 From: Trent Hauck Date: Thu, 24 Aug 2023 08:43:29 -0700 Subject: [PATCH 5/7] Apply suggestions from code review Co-authored-by: Andrew Lamb --- docs/source/library-user-guide/adding-udfs.md | 4 ++-- .../source/library-user-guide/working-with-exprs.md | 13 ++++--------- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/docs/source/library-user-guide/adding-udfs.md b/docs/source/library-user-guide/adding-udfs.md index b57d9089bc69..20f3987412e1 100644 --- a/docs/source/library-user-guide/adding-udfs.md +++ b/docs/source/library-user-guide/adding-udfs.md @@ -44,7 +44,7 @@ use datafusion::common::Result; use datafusion::common::cast::as_int32_array; pub fn add_one(args: &[ArrayRef]) -> Result { - let i64s = as_int32_array(&args[0])?; + let i64s = as_int64_array(&args[0])?; let array = i64s .iter() @@ -104,7 +104,7 @@ let mut ctx = SessionContext::new(); ctx.register_udf(udf); ``` -At this point, the following could is expected to work: +At this point, you can use the `add_one` function in your query: ```rust let sql = "SELECT add_one(1)"; diff --git a/docs/source/library-user-guide/working-with-exprs.md b/docs/source/library-user-guide/working-with-exprs.md index 2ee42bd24c41..7a1c6001c114 100644 --- a/docs/source/library-user-guide/working-with-exprs.md +++ b/docs/source/library-user-guide/working-with-exprs.md @@ -21,7 +21,7 @@ -`Expr` is short for "expression". It is a core abstraction in DataFusion for representing a computation. +`Expr` is short for "expression". It is a core abstraction in DataFusion for representing a computation, and follows the standard "expression tree" abstraction found in most compilers and databases. For example, the SQL expression `a + b` would be represented as an `Expr` with a `BinaryExpr` variant. A `BinaryExpr` has a left and right `Expr` and an operator. @@ -73,6 +73,7 @@ In addition to SQL strings, you can create `Expr`s programatically. This is comm ```rust use datafusion::prelude::*; +// Represent `5 + 5` let expr = lit(5) + lit(5); ``` @@ -109,13 +110,7 @@ fn rewrite_add_one(expr: Expr) -> Result { if scalar_fun.fun.name == "add_one" { let input_arg = scalar_fun.args[0].clone(); - let new_expression = BinaryExpr::new( - Box::new(input_arg), - datafusion::logical_expr::Operator::Plus, - Box::new(Expr::Literal(datafusion::scalar::ScalarValue::Int64(Some( - 1, - )))), - ); + let new_expression = input_arg + lit(1i64); Transformed::Yes(Expr::BinaryExpr(new_expression)) } else { @@ -131,7 +126,7 @@ fn rewrite_add_one(expr: Expr) -> Result { ### Creating an `OptimizerRule` -In DataFusion, an `OptimizerRule` is a trait that supports rewriting `Expr`s. It follows DataFusion's general mantra of trait implementations to drive behavior. +In DataFusion, an `OptimizerRule` is a trait that supports rewriting`Expr`s that appear in various parts of the `LogicalPlan`. It follows DataFusion's general mantra of trait implementations to drive behavior. We'll call our rule `AddOneInliner` and implement the `OptimizerRule` trait. The `OptimizerRule` trait has two methods: From 4d87440c308e50fabcb1d83ebb9aeb78787a180c Mon Sep 17 00:00:00 2001 From: Trent Hauck Date: Thu, 24 Aug 2023 20:01:16 -0700 Subject: [PATCH 6/7] fix: feedback updates --- .../library-user-guide/working-with-exprs.md | 105 +++++------------- 1 file changed, 25 insertions(+), 80 deletions(-) diff --git a/docs/source/library-user-guide/working-with-exprs.md b/docs/source/library-user-guide/working-with-exprs.md index 7a1c6001c114..ec2a15faa8e2 100644 --- a/docs/source/library-user-guide/working-with-exprs.md +++ b/docs/source/library-user-guide/working-with-exprs.md @@ -21,7 +21,7 @@ -`Expr` is short for "expression". It is a core abstraction in DataFusion for representing a computation, and follows the standard "expression tree" abstraction found in most compilers and databases. +`Expr` is short for "expression". It is a core abstraction in DataFusion for representing a computation, and follows the standard "expression tree" abstraction found in most compilers and databases. For example, the SQL expression `a + b` would be represented as an `Expr` with a `BinaryExpr` variant. A `BinaryExpr` has a left and right `Expr` and an operator. @@ -36,25 +36,9 @@ There are also executable examples for working with `Expr`s: ## A Scalar UDF Example -Let's start by creating our own `Expr` in the form of a Scalar UDF. This UDF will simply add 1 to the input value. +We'll use a `ScalarUDF` expression as our example. This necessitates implementing an actual UDF, and for ease we'll use the same example from the [adding UDFs](./adding-udfs.md) guide. -```rust -pub fn add_one(args: &[ArrayRef]) -> Result { - let i64s = as_int64_array(&args[0])?; - - let array = i64s - .iter() - .map(|item| match item { - Some(value) => Some(value + 1), - None => None, - }) - .collect::(); - - Ok(Arc::new(array)) -} -``` - -And our `ScalarUDF` would look like this. Please see the section on [adding UDFs](./adding-udfs.md) for more information on how to create a `ScalarUDF`. +So assuming you've written that function, you can use it to create an `Expr`: ```rust let add_one_udf = create_udf( @@ -64,28 +48,15 @@ let add_one_udf = create_udf( Volatility::Immutable, make_scalar_function(add_one), ); -``` - -## Creating Exprs Programmatically -In addition to SQL strings, you can create `Expr`s programatically. This is common if you're working with a DataFrame vs. a SQL string. A simple example is: +// make the expr `add_one(5)` +let expr = add_one_udf.call(vec![lit(5)]); -```rust -use datafusion::prelude::*; - -// Represent `5 + 5` -let expr = lit(5) + lit(5); +// make the expr `add_one(my_column)` +let expr = add_one_udf.call(vec![col("my_column")]); ``` -This is obviously a very simple example, but it shows how you can create an `Expr` from a literal value and sets us up later for how to simplify `Expr`s. You can also create `Expr`s from column references and operators: - -```rust -use datafusion::prelude::*; - -let expr = col("a") + col("b"); -``` - -In fact, the `add_one_udf` we created earlier is also an `Expr`. And because it's so simple, we'll use it as fodder for how to rewrite `Expr`s. +If you'd like to learn more about `Expr`s, before we get into the details of creating and rewriting them, you can read the [expression user-guide](./../user-guide/expressions.md). ## Rewriting Exprs @@ -105,20 +76,13 @@ To implement the inlining, we'll need to write a function that takes an `Expr` a fn rewrite_add_one(expr: Expr) -> Result { expr.transform(&|expr| { Ok(match expr { - Expr::ScalarUDF(scalar_fun) => { - // rewrite the expression if the function is "add_one", otherwise return the original expression - if scalar_fun.fun.name == "add_one" { - let input_arg = scalar_fun.args[0].clone(); - - let new_expression = input_arg + lit(1i64); - - Transformed::Yes(Expr::BinaryExpr(new_expression)) - } else { - // a scalar function that is not "add_one" is not rewritten - Transformed::No(Expr::ScalarUDF(scalar_fun)) - } + Expr::ScalarUDF(scalar_fun) if scalar_fun.fun.name == "add_one" => { + let input_arg = scalar_fun.args[0].clone(); + let new_expression = input_arg + lit(1i64); + + Transformed::Yes(new_expression) } - _ => Transformed::No(expr), // not a scalar function, so not rewritten + _ => Transformed::No(expr), }) }) } @@ -126,7 +90,7 @@ fn rewrite_add_one(expr: Expr) -> Result { ### Creating an `OptimizerRule` -In DataFusion, an `OptimizerRule` is a trait that supports rewriting`Expr`s that appear in various parts of the `LogicalPlan`. It follows DataFusion's general mantra of trait implementations to drive behavior. +In DataFusion, an `OptimizerRule` is a trait that supports rewriting`Expr`s that appear in various parts of the `LogicalPlan`. It follows DataFusion's general mantra of trait implementations to drive behavior. We'll call our rule `AddOneInliner` and implement the `OptimizerRule` trait. The `OptimizerRule` trait has two methods: @@ -146,42 +110,23 @@ impl OptimizerRule for AddOneInliner { plan: &LogicalPlan, config: &dyn OptimizerConfig, ) -> Result> { - // recurse down and optimize children first - let optimized_plan = utils::optimize_children(self, plan, config)?; + // Map over the expressions and rewrite them + let new_expressions = plan + .expressions() + .into_iter() + .map(|expr| rewrite_add_one(expr)) + .collect::>>()?; - match optimized_plan { - Some(LogicalPlan::Projection(projection)) => { - let proj_expression = projection - .expr - .iter() - .map(|expr| rewrite_add_one(expr.clone())) - .collect::>>()?; + let inputs = plan.inputs().into_iter().cloned().collect::>(); - let proj = Projection::try_new(proj_expression, projection.input)?; + let plan = plan.with_new_exprs(&new_expressions, &inputs); - Ok(Some(LogicalPlan::Projection(proj))) - } - Some(optimized_plan) => Ok(Some(optimized_plan)), - None => match plan { - LogicalPlan::Projection(projection) => { - let proj_expression = projection - .expr - .iter() - .map(|expr| rewrite_add_one(expr.clone())) - .collect::>>()?; - - let proj = Projection::try_new(proj_expression, projection.input.clone())?; - - Ok(Some(LogicalPlan::Projection(proj))) - } - _ => Ok(None), - }, - } + plan.map(Some) } } ``` -Note the use of `rewrite_add_one` which is mapped over the `expr` of the `Projection`. This is the function we wrote earlier that takes an `Expr` and returns a `Result`. +Note the use of `rewrite_add_one` which is mapped over `plan.expressions()` to rewrite the expressions, then `plan.with_new_exprs` is used to create a new `LogicalPlan` with the rewritten expressions. We're almost there. Let's just test our rule works properly. From d8c9add0092b347d326d79701223ecbbbb451aa6 Mon Sep 17 00:00:00 2001 From: Trent Hauck Date: Fri, 25 Aug 2023 07:26:00 -0700 Subject: [PATCH 7/7] docs: update w/ feedback --- docs/source/library-user-guide/adding-udfs.md | 19 +++++++------- .../library-user-guide/working-with-exprs.md | 25 +++++++++++++++++-- 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/docs/source/library-user-guide/adding-udfs.md b/docs/source/library-user-guide/adding-udfs.md index 20f3987412e1..d3f31bd45aee 100644 --- a/docs/source/library-user-guide/adding-udfs.md +++ b/docs/source/library-user-guide/adding-udfs.md @@ -41,20 +41,19 @@ use std::sync::Arc; use arrow::array::{ArrayRef, Int64Array}; use datafusion::common::Result; -use datafusion::common::cast::as_int32_array; +use datafusion::common::cast::as_int64_array; pub fn add_one(args: &[ArrayRef]) -> Result { + // Error handling omitted for brevity + let i64s = as_int64_array(&args[0])?; - let array = i64s - .iter() - .map(|sequence| match sequence { - Some(value) => Some(value + 1), - None => None, - }) - .collect::(); + let new_array = i64s + .iter() + .map(|array_elem| array_elem.map(|value| value + 1)) + .collect::(); - Ok(Arc::new(array)) + Ok(Arc::new(new_array)) } ``` @@ -93,7 +92,7 @@ A few things to note: - The first argument is the name of the function. This is the name that will be used in SQL queries. - The second argument is a vector of `DataType`s. This is the list of argument types that the function accepts. I.e. in this case, the function accepts a single `Int64` argument. - The third argument is the return type of the function. I.e. in this case, the function returns an `Int64`. -- The fourth argument is the volatility of the function. This is an enum with three options: `Immutable`, `Stable`, and `Volatile`. This is used to determine if the function can be cached in some situations. In this case, the function is `Immutable` because it always returns the same value for the same input. A random number generator would be `Volatile` because it returns a different value for the same input. +- The fourth argument is the volatility of the function. In short, this is used to determine if the function's performance can be optimized in some situations. In this case, the function is `Immutable` because it always returns the same value for the same input. A random number generator would be `Volatile` because it returns a different value for the same input. - The fifth argument is the function implementation. This is the function that we defined above. That gives us a `ScalarUDF` that we can register with the `SessionContext`: diff --git a/docs/source/library-user-guide/working-with-exprs.md b/docs/source/library-user-guide/working-with-exprs.md index ec2a15faa8e2..507e984acb0b 100644 --- a/docs/source/library-user-guide/working-with-exprs.md +++ b/docs/source/library-user-guide/working-with-exprs.md @@ -25,7 +25,28 @@ For example, the SQL expression `a + b` would be represented as an `Expr` with a `BinaryExpr` variant. A `BinaryExpr` has a left and right `Expr` and an operator. -As another example, the SQL expression `a + b * c` would be represented as an `Expr` with a `BinaryExpr` variant. The left `Expr` would be `a` and the right `Expr` would be another `BinaryExpr` with a left `Expr` of `b` and a right `Expr` of `c`. +As another example, the SQL expression `a + b * c` would be represented as an `Expr` with a `BinaryExpr` variant. The left `Expr` would be `a` and the right `Expr` would be another `BinaryExpr` with a left `Expr` of `b` and a right `Expr` of `c`. As a classic expression tree, this would look like: + +```text + ┌────────────────────┐ + │ BinaryExpr │ + │ op: + │ + └────────────────────┘ + ▲ ▲ + ┌───────┘ └────────────────┐ + │ │ +┌────────────────────┐ ┌────────────────────┐ +│ Expr::Col │ │ BinaryExpr │ +│ col: a │ │ op: * │ +└────────────────────┘ └────────────────────┘ + ▲ ▲ + ┌────────┘ └─────────┐ + │ │ + ┌────────────────────┐ ┌────────────────────┐ + │ Expr::Col │ │ Expr::Col │ + │ col: b │ │ col: c │ + └────────────────────┘ └────────────────────┘ +``` As the writer of a library, you may want to use or create `Expr`s to represent computations that you want to perform. This guide will walk you through how to make your own scalar UDF as an `Expr` and how to rewrite `Expr`s to inline the simple UDF. @@ -46,7 +67,7 @@ let add_one_udf = create_udf( vec![DataType::Int64], Arc::new(DataType::Int64), Volatility::Immutable, - make_scalar_function(add_one), + make_scalar_function(add_one), // <-- the function we wrote ); // make the expr `add_one(5)`