Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into improve-cse-volatile-…
Browse files Browse the repository at this point in the history
…handling
  • Loading branch information
alamb committed Jul 6, 2024
2 parents 1aa8848 + 5657886 commit 4a0425a
Show file tree
Hide file tree
Showing 67 changed files with 2,782 additions and 855 deletions.
2 changes: 1 addition & 1 deletion datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ cargo run --example dataframe
- [`advanced_udf.rs`](examples/advanced_udf.rs): Define and invoke a more complicated User Defined Scalar Function (UDF)
- [`advanced_udwf.rs`](examples/advanced_udwf.rs): Define and invoke a more complicated User Defined Window Function (UDWF)
- [`advanced_parquet_index.rs`](examples/advanced_parquet_index.rs): Creates a detailed secondary index that covers the contents of several parquet files
- [`analyzer_rule.rs`](examples/analyzer_rule.rs): Use a custom AnalyzerRule to change a query's semantics (row level access control)
- [`catalog.rs`](examples/catalog.rs): Register the table into a custom catalog
- [`composed_extension_codec`](examples/composed_extension_codec.rs): Example of using multiple extension codecs for serialization / deserialization
- [`csv_sql_streaming.rs`](examples/csv_sql_streaming.rs): Build and run a streaming query plan from a SQL statement against a local CSV file
Expand All @@ -72,7 +73,6 @@ cargo run --example dataframe
- [`query-aws-s3.rs`](examples/external_dependency/query-aws-s3.rs): Configure `object_store` and run a query against files stored in AWS S3
- [`query-http-csv.rs`](examples/query-http-csv.rs): Configure `object_store` and run a query against files vi HTTP
- [`regexp.rs`](examples/regexp.rs): Examples of using regular expression functions
- [`rewrite_expr.rs`](examples/rewrite_expr.rs): Define and invoke a custom Query Optimizer pass
- [`simple_udaf.rs`](examples/simple_udaf.rs): Define and invoke a User Defined Aggregate Function (UDAF)
- [`simple_udf.rs`](examples/simple_udf.rs): Define and invoke a User Defined Scalar Function (UDF)
- [`simple_udfw.rs`](examples/simple_udwf.rs): Define and invoke a User Defined Window Function (UDWF)
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/advanced_udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl AggregateUDFImpl for GeoMeanUdaf {
}

/// This is the description of the state. accumulator's state() must match the types here.
fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<arrow_schema::Field>> {
fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
Ok(vec![
Field::new("prod", args.return_type.clone(), true),
Field::new("n", DataType::UInt32, true),
Expand Down
200 changes: 200 additions & 0 deletions datafusion-examples/examples/analyzer_rule.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use datafusion::prelude::SessionContext;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::Result;
use datafusion_expr::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder};
use datafusion_optimizer::analyzer::AnalyzerRule;
use std::sync::{Arc, Mutex};

/// This example demonstrates how to add your own [`AnalyzerRule`] to
/// DataFusion.
///
/// [`AnalyzerRule`]s transform [`LogicalPlan`]s prior to the DataFusion
/// optimization process, and can be used to change the plan's semantics (e.g.
/// output types).
///
/// This example shows an `AnalyzerRule` which implements a simplistic of row
/// level access control scheme by introducing a filter to the query.
///
/// See [optimizer_rule.rs] for an example of a optimizer rule
#[tokio::main]
pub async fn main() -> Result<()> {
// AnalyzerRules run before OptimizerRules.
//
// DataFusion includes several built in AnalyzerRules for tasks such as type
// coercion which change the types of expressions in the plan. Add our new
// rule to the context to run it during the analysis phase.
let rule = Arc::new(RowLevelAccessControl::new());
let ctx = SessionContext::new();
ctx.add_analyzer_rule(Arc::clone(&rule) as _);

ctx.register_batch("employee", employee_batch())?;

// Now, planning any SQL statement also invokes the AnalyzerRule
let plan = ctx
.sql("SELECT * FROM employee")
.await?
.into_optimized_plan()?;

// Printing the query plan shows a filter has been added
//
// Filter: employee.position = Utf8("Engineer")
// TableScan: employee projection=[name, age, position]
println!("Logical Plan:\n\n{}\n", plan.display_indent());

// Execute the query, and indeed no Manager's are returned
//
// +-----------+-----+----------+
// | name | age | position |
// +-----------+-----+----------+
// | Andy | 11 | Engineer |
// | Oleks | 33 | Engineer |
// | Xiangpeng | 55 | Engineer |
// +-----------+-----+----------+
ctx.sql("SELECT * FROM employee").await?.show().await?;

// We can now change the access level to "Manager" and see the results
//
// +----------+-----+----------+
// | name | age | position |
// +----------+-----+----------+
// | Andrew | 22 | Manager |
// | Chunchun | 44 | Manager |
// +----------+-----+----------+
rule.set_show_position("Manager");
ctx.sql("SELECT * FROM employee").await?.show().await?;

// The filters introduced by our AnalyzerRule are treated the same as any
// other filter by the DataFusion optimizer, including predicate push down
// (including into scans), simplifications, and similar optimizations.
//
// For example adding another predicate to the query
let plan = ctx
.sql("SELECT * FROM employee WHERE age > 30")
.await?
.into_optimized_plan()?;

// We can see the DataFusion Optimizer has combined the filters together
// when we print out the plan
//
// Filter: employee.age > Int32(30) AND employee.position = Utf8("Manager")
// TableScan: employee projection=[name, age, position]
println!("Logical Plan:\n\n{}\n", plan.display_indent());

Ok(())
}

/// Example AnalyzerRule that implements a very basic "row level access
/// control"
///
/// In this case, it adds a filter to the plan that removes all managers from
/// the result set.
#[derive(Debug)]
struct RowLevelAccessControl {
/// Models the current access level of the session
///
/// This is value of the position column which should be included in the
/// result set. It is wrapped in a `Mutex` so we can change it during query
show_position: Mutex<String>,
}

impl RowLevelAccessControl {
fn new() -> Self {
Self {
show_position: Mutex::new("Engineer".to_string()),
}
}

/// return the current position to show, as an expression
fn show_position(&self) -> Expr {
lit(self.show_position.lock().unwrap().clone())
}

/// specifies a different position to show in the result set
fn set_show_position(&self, access_level: impl Into<String>) {
*self.show_position.lock().unwrap() = access_level.into();
}
}

impl AnalyzerRule for RowLevelAccessControl {
fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> Result<LogicalPlan> {
// use the TreeNode API to recursively walk the LogicalPlan tree
// and all of its children (inputs)
let transfomed_plan = plan.transform(|plan| {
// This closure is called for each LogicalPlan node
// if it is a Scan node, add a filter to remove all managers
if is_employee_table_scan(&plan) {
// Use the LogicalPlanBuilder to add a filter to the plan
let filter = LogicalPlanBuilder::from(plan)
// Filter Expression: position = <access level>
.filter(col("position").eq(self.show_position()))?
.build()?;

// `Transformed::yes` signals the plan was changed
Ok(Transformed::yes(filter))
} else {
// `Transformed::no`
// signals the plan was not changed
Ok(Transformed::no(plan))
}
})?;

// the result of calling transform is a `Transformed` structure which
// contains
//
// 1. a flag signaling if any rewrite took place
// 2. a flag if the recursion stopped early
// 3. The actual transformed data (a LogicalPlan in this case)
//
// This example does not need the value of either flag, so simply
// extract the LogicalPlan "data"
Ok(transfomed_plan.data)
}

fn name(&self) -> &str {
"table_access"
}
}

fn is_employee_table_scan(plan: &LogicalPlan) -> bool {
if let LogicalPlan::TableScan(scan) = plan {
scan.table_name.table() == "employee"
} else {
false
}
}

/// Return a RecordBatch with made up data about fictional employees
fn employee_batch() -> RecordBatch {
let name: ArrayRef = Arc::new(StringArray::from_iter_values([
"Andy",
"Andrew",
"Oleks",
"Chunchun",
"Xiangpeng",
]));
let age: ArrayRef = Arc::new(Int32Array::from(vec![11, 22, 33, 44, 55]));
let position = Arc::new(StringArray::from_iter_values([
"Engineer", "Manager", "Engineer", "Manager", "Engineer",
]));
RecordBatch::try_from_iter(vec![("name", name), ("age", age), ("position", position)])
.unwrap()
}
57 changes: 32 additions & 25 deletions datafusion-examples/examples/optimizer_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow_schema::DataType;
use datafusion::prelude::SessionContext;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{Result, ScalarValue};
use datafusion_common::{assert_batches_eq, Result, ScalarValue};
use datafusion_expr::{
BinaryExpr, ColumnarValue, Expr, LogicalPlan, Operator, ScalarUDF, ScalarUDFImpl,
Signature, Volatility,
Expand Down Expand Up @@ -54,39 +54,46 @@ pub async fn main() -> Result<()> {

// We can see the effect of our rewrite on the output plan that the filter
// has been rewritten to `my_eq`
//
// Filter: my_eq(person.age, Int32(22))
// TableScan: person projection=[name, age]
println!("Logical Plan:\n\n{}\n", plan.display_indent());
assert_eq!(
plan.display_indent().to_string(),
"Filter: my_eq(person.age, Int32(22))\
\n TableScan: person projection=[name, age]"
);

// The query below doesn't respect a filter `where age = 22` because
// the plan has been rewritten using UDF which returns always true
//
// And the output verifies the predicates have been changed (as the my_eq
// function always returns true)
//
// +--------+-----+
// | name | age |
// +--------+-----+
// | Andy | 11 |
// | Andrew | 22 |
// | Oleks | 33 |
// +--------+-----+
ctx.sql(sql).await?.show().await?;
assert_batches_eq!(
[
"+--------+-----+",
"| name | age |",
"+--------+-----+",
"| Andy | 11 |",
"| Andrew | 22 |",
"| Oleks | 33 |",
"+--------+-----+",
],
&ctx.sql(sql).await?.collect().await?
);

// however we can see the rule doesn't trigger for queries with predicates
// other than `=`
//
// +-------+-----+
// | name | age |
// +-------+-----+
// | Andy | 11 |
// | Oleks | 33 |
// +-------+-----+
ctx.sql("SELECT * FROM person WHERE age <> 22")
.await?
.show()
.await?;
assert_batches_eq!(
[
"+-------+-----+",
"| name | age |",
"+-------+-----+",
"| Andy | 11 |",
"| Oleks | 33 |",
"+-------+-----+",
],
&ctx.sql("SELECT * FROM person WHERE age <> 22")
.await?
.collect()
.await?
);

Ok(())
}
Expand Down
Loading

0 comments on commit 4a0425a

Please sign in to comment.