Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datafusion] NOW() function support #288

Merged
merged 27 commits into from
May 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9db149a
Add initial implementation of NOW
msathis May 7, 2021
d506280
Run rustfmt
msathis May 7, 2021
efbc021
Change incorrect condition
msathis May 7, 2021
c90cca3
Add timestamp optimizer which optimizes the logical plan and makes su…
msathis May 8, 2021
148b495
Add unit tests & fix alias
msathis May 8, 2021
a07bb7e
Add unit tests & fix alias
msathis May 8, 2021
d68fd88
Run cargo fmt
msathis May 8, 2021
25d50f8
Comment out failing test
msathis May 9, 2021
32304cf
Optimize the match to fix clippy
msathis May 9, 2021
47e0edb
Initialize datetime during optimize not creation
msathis May 9, 2021
3ac4a65
Add assertion to compare multiple now() values
msathis May 9, 2021
24c5bf5
Run cargo fmt
msathis May 9, 2021
4ba698f
Move timestamp to execution props
msathis May 10, 2021
964b07d
Add missing prop
msathis May 10, 2021
67a76bb
Add missing prop
msathis May 10, 2021
6ebdbdc
Remove duplicated code
msathis May 10, 2021
9335a70
Fix tests & format
msathis May 10, 2021
12a4964
Fix clippy
msathis May 10, 2021
3cb9e7b
Revert clippy fix
msathis May 10, 2021
280f982
Update datafusion/src/execution/context.rs
msathis May 10, 2021
a6462dd
Fix review comments. Move timestamp evaluation logic to constant_fold…
msathis May 11, 2021
1370742
Merge remote-tracking branch 'origin/add-now-function' into add-now-f…
msathis May 11, 2021
d9cb005
Pass ExecutionProps to scalar functions
msathis May 12, 2021
4d05f0f
Revert "Pass ExecutionProps to scalar functions"
msathis May 14, 2021
930aaae
Add closure approach from @alamb
msathis May 14, 2021
1987ac3
Re-enable concat test
msathis May 14, 2021
9615d92
Changing Option<DateTime<Utc>> to DateTime<Utc>
msathis May 14, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ use arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion::catalog::catalog::{
CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
};
use datafusion::execution::context::{ExecutionConfig, ExecutionContextState};
use datafusion::execution::context::{
ExecutionConfig, ExecutionContextState, ExecutionProps,
};
use datafusion::logical_plan::{DFSchema, Expr};
use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunction};
use datafusion::physical_plan::expressions::col;
Expand Down Expand Up @@ -226,6 +228,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
var_provider: Default::default(),
aggregate_functions: Default::default(),
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
};

let input_schema = hash_agg
Expand Down Expand Up @@ -390,6 +393,7 @@ fn compile_expr(
var_provider: HashMap::new(),
aggregate_functions: HashMap::new(),
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
};
let expr: Expr = expr.try_into()?;
df_planner
Expand Down
128 changes: 86 additions & 42 deletions datafusion/src/execution/context.rs

Large diffs are not rendered by default.

107 changes: 103 additions & 4 deletions datafusion/src/optimizer/constant_folding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ use std::sync::Arc;
use arrow::datatypes::DataType;

use crate::error::Result;
use crate::execution::context::ExecutionProps;
use crate::logical_plan::{DFSchemaRef, Expr, ExprRewriter, LogicalPlan, Operator};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
use crate::physical_plan::functions::BuiltinScalarFunction;
use crate::scalar::ScalarValue;

/// Optimizer that simplifies comparison expressions involving boolean literals.
Expand All @@ -47,20 +49,25 @@ impl ConstantFolding {
}

impl OptimizerRule for ConstantFolding {
fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
fn optimize(
&self,
plan: &LogicalPlan,
execution_props: &ExecutionProps,
) -> Result<LogicalPlan> {
// We need to pass down the all schemas within the plan tree to `optimize_expr` in order to
// to evaluate expression types. For example, a projection plan's schema will only include
// projected columns. With just the projected schema, it's not possible to infer types for
// expressions that references non-projected columns within the same project plan or its
// children plans.
let mut rewriter = ConstantRewriter {
schemas: plan.all_schemas(),
execution_props,
};

match plan {
LogicalPlan::Filter { predicate, input } => Ok(LogicalPlan::Filter {
predicate: predicate.clone().rewrite(&mut rewriter)?,
input: Arc::new(self.optimize(input)?),
input: Arc::new(self.optimize(input, execution_props)?),
}),
// Rest: recurse into plan, apply optimization where possible
LogicalPlan::Projection { .. }
Expand All @@ -78,7 +85,7 @@ impl OptimizerRule for ConstantFolding {
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
.map(|plan| self.optimize(plan))
.map(|plan| self.optimize(plan, execution_props))
.collect::<Result<Vec<_>>>()?;

let expr = plan
Expand All @@ -103,6 +110,7 @@ impl OptimizerRule for ConstantFolding {
struct ConstantRewriter<'a> {
/// input schemas
schemas: Vec<&'a DFSchemaRef>,
execution_props: &'a ExecutionProps,
}

impl<'a> ConstantRewriter<'a> {
Expand Down Expand Up @@ -200,6 +208,14 @@ impl<'a> ExprRewriter for ConstantRewriter<'a> {
Expr::Not(inner)
}
}
Expr::ScalarFunction {
msathis marked this conversation as resolved.
Show resolved Hide resolved
fun: BuiltinScalarFunction::Now,
..
} => Expr::Literal(ScalarValue::TimestampNanosecond(Some(
self.execution_props
.query_execution_start_time
.timestamp_nanos(),
))),
expr => {
// no rewrite possible
expr
Expand All @@ -217,6 +233,7 @@ mod tests {
};

use arrow::datatypes::*;
use chrono::{DateTime, Utc};

fn test_table_scan() -> Result<LogicalPlan> {
let schema = Schema::new(vec![
Expand All @@ -243,6 +260,7 @@ mod tests {
let schema = expr_test_schema();
let mut rewriter = ConstantRewriter {
schemas: vec![&schema],
execution_props: &ExecutionProps::new(),
};

assert_eq!(
Expand All @@ -258,6 +276,7 @@ mod tests {
let schema = expr_test_schema();
let mut rewriter = ConstantRewriter {
schemas: vec![&schema],
execution_props: &ExecutionProps::new(),
};

// x = null is always null
Expand Down Expand Up @@ -293,6 +312,7 @@ mod tests {
let schema = expr_test_schema();
let mut rewriter = ConstantRewriter {
schemas: vec![&schema],
execution_props: &ExecutionProps::new(),
};

assert_eq!(col("c2").get_type(&schema)?, DataType::Boolean);
Expand Down Expand Up @@ -323,6 +343,7 @@ mod tests {
let schema = expr_test_schema();
let mut rewriter = ConstantRewriter {
schemas: vec![&schema],
execution_props: &ExecutionProps::new(),
};

// When one of the operand is not of boolean type, folding the other boolean constant will
Expand Down Expand Up @@ -362,6 +383,7 @@ mod tests {
let schema = expr_test_schema();
let mut rewriter = ConstantRewriter {
schemas: vec![&schema],
execution_props: &ExecutionProps::new(),
};

assert_eq!(col("c2").get_type(&schema)?, DataType::Boolean);
Expand Down Expand Up @@ -397,6 +419,7 @@ mod tests {
let schema = expr_test_schema();
let mut rewriter = ConstantRewriter {
schemas: vec![&schema],
execution_props: &ExecutionProps::new(),
};

// when one of the operand is not of boolean type, folding the other boolean constant will
Expand Down Expand Up @@ -432,6 +455,7 @@ mod tests {
let schema = expr_test_schema();
let mut rewriter = ConstantRewriter {
schemas: vec![&schema],
execution_props: &ExecutionProps::new(),
};

assert_eq!(
Expand Down Expand Up @@ -459,7 +483,9 @@ mod tests {

fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = ConstantFolding::new();
let optimized_plan = rule.optimize(plan).expect("failed to optimize plan");
let optimized_plan = rule
.optimize(plan, &ExecutionProps::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
}
Expand Down Expand Up @@ -589,4 +615,77 @@ mod tests {
assert_optimized_plan_eq(&plan, expected);
Ok(())
}

fn get_optimized_plan_formatted(
plan: &LogicalPlan,
date_time: &DateTime<Utc>,
) -> String {
let rule = ConstantFolding::new();
let execution_props = ExecutionProps {
query_execution_start_time: *date_time,
};

let optimized_plan = rule
.optimize(plan, &execution_props)
.expect("failed to optimize plan");
return format!("{:?}", optimized_plan);
}

#[test]
fn single_now_expr() {
let table_scan = test_table_scan().unwrap();
let proj = vec![Expr::ScalarFunction {
args: vec![],
fun: BuiltinScalarFunction::Now,
}];
let time = chrono::Utc::now();
let plan = LogicalPlanBuilder::from(&table_scan)
.project(proj)
.unwrap()
.build()
.unwrap();

let expected = format!(
"Projection: TimestampNanosecond({})\
\n TableScan: test projection=None",
time.timestamp_nanos()
);
let actual = get_optimized_plan_formatted(&plan, &time);

assert_eq!(expected, actual);
}

#[test]
fn multiple_now_expr() {
let table_scan = test_table_scan().unwrap();
let time = chrono::Utc::now();
let proj = vec![
Expr::ScalarFunction {
args: vec![],
fun: BuiltinScalarFunction::Now,
},
Expr::Alias(
Box::new(Expr::ScalarFunction {
args: vec![],
fun: BuiltinScalarFunction::Now,
}),
"t2".to_string(),
),
];
let plan = LogicalPlanBuilder::from(&table_scan)
.project(proj)
.unwrap()
.build()
.unwrap();

let actual = get_optimized_plan_formatted(&plan, &time);
let expected = format!(
"Projection: TimestampNanosecond({}), TimestampNanosecond({}) AS t2\
\n TableScan: test projection=None",
time.timestamp_nanos(),
time.timestamp_nanos()
);

assert_eq!(actual, expected);
}
}
13 changes: 10 additions & 3 deletions datafusion/src/optimizer/eliminate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::logical_plan::LogicalPlan;
use crate::optimizer::optimizer::OptimizerRule;

use super::utils;
use crate::execution::context::ExecutionProps;

/// Optimization rule that replaces LIMIT 0 with an [LogicalPlan::EmptyRelation]
pub struct EliminateLimit;
Expand All @@ -34,7 +35,11 @@ impl EliminateLimit {
}

impl OptimizerRule for EliminateLimit {
fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
fn optimize(
&self,
plan: &LogicalPlan,
execution_props: &ExecutionProps,
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Limit { n, input } if *n == 0 => {
Ok(LogicalPlan::EmptyRelation {
Expand All @@ -50,7 +55,7 @@ impl OptimizerRule for EliminateLimit {
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
.map(|plan| self.optimize(plan))
.map(|plan| self.optimize(plan, execution_props))
.collect::<Result<Vec<_>>>()?;

utils::from_plan(plan, &expr, &new_inputs)
Expand All @@ -72,7 +77,9 @@ mod tests {

fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = EliminateLimit::new();
let optimized_plan = rule.optimize(plan).expect("failed to optimize plan");
let optimized_plan = rule
.optimize(plan, &ExecutionProps::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
assert_eq!(plan.schema(), optimized_plan.schema());
Expand Down
7 changes: 5 additions & 2 deletions datafusion/src/optimizer/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan

use crate::datasource::datasource::TableProviderFilterPushDown;
use crate::execution::context::ExecutionProps;
use crate::logical_plan::{and, LogicalPlan};
use crate::logical_plan::{DFSchema, Expr};
use crate::optimizer::optimizer::OptimizerRule;
Expand Down Expand Up @@ -413,7 +414,7 @@ impl OptimizerRule for FilterPushDown {
"filter_push_down"
}

fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
fn optimize(&self, plan: &LogicalPlan, _: &ExecutionProps) -> Result<LogicalPlan> {
optimize(plan, State::default())
}
}
Expand Down Expand Up @@ -456,7 +457,9 @@ mod tests {

fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = FilterPushDown::new();
let optimized_plan = rule.optimize(plan).expect("failed to optimize plan");
let optimized_plan = rule
.optimize(plan, &ExecutionProps::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
}
Expand Down
17 changes: 11 additions & 6 deletions datafusion/src/optimizer/hash_build_probe_order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::optimizer::optimizer::OptimizerRule;
use crate::{error::Result, prelude::JoinType};

use super::utils;
use crate::execution::context::ExecutionProps;

/// BuildProbeOrder reorders the build and probe phase of
/// hash joins. This uses the amount of rows that a datasource has.
Expand Down Expand Up @@ -106,7 +107,11 @@ impl OptimizerRule for HashBuildProbeOrder {
"hash_build_probe_order"
}

fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
fn optimize(
&self,
plan: &LogicalPlan,
execution_props: &ExecutionProps,
) -> Result<LogicalPlan> {
match plan {
// Main optimization rule, swaps order of left and right
// based on number of rows in each table
Expand All @@ -117,8 +122,8 @@ impl OptimizerRule for HashBuildProbeOrder {
join_type,
schema,
} => {
let left = self.optimize(left)?;
let right = self.optimize(right)?;
let left = self.optimize(left, execution_props)?;
let right = self.optimize(right, execution_props)?;
if should_swap_join_order(&left, &right) {
// Swap left and right, change join type and (equi-)join key order
Ok(LogicalPlan::Join {
Expand Down Expand Up @@ -147,8 +152,8 @@ impl OptimizerRule for HashBuildProbeOrder {
right,
schema,
} => {
let left = self.optimize(left)?;
let right = self.optimize(right)?;
let left = self.optimize(left, execution_props)?;
let right = self.optimize(right, execution_props)?;
if should_swap_join_order(&left, &right) {
// Swap left and right
Ok(LogicalPlan::CrossJoin {
Expand Down Expand Up @@ -184,7 +189,7 @@ impl OptimizerRule for HashBuildProbeOrder {
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
.map(|plan| self.optimize(plan))
.map(|plan| self.optimize(plan, execution_props))
.collect::<Result<Vec<_>>>()?;

utils::from_plan(plan, &expr, &new_inputs)
Expand Down
Loading