Skip to content

Commit

Permalink
Pass ExecutionProps to scalar functions
Browse files Browse the repository at this point in the history
  • Loading branch information
msathis committed May 12, 2021
1 parent 1370742 commit d9cb005
Show file tree
Hide file tree
Showing 17 changed files with 842 additions and 482 deletions.
3 changes: 2 additions & 1 deletion datafusion-examples/examples/simple_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use datafusion::arrow::{
util::pretty,
};

use datafusion::execution::context::ExecutionProps;
use datafusion::prelude::*;
use datafusion::{error::Result, physical_plan::functions::make_scalar_function};
use std::sync::Arc;
Expand Down Expand Up @@ -60,7 +61,7 @@ async fn main() -> Result<()> {
let mut ctx = create_context()?;

// First, declare the actual implementation of the calculation
let pow = |args: &[ArrayRef]| {
let pow = |args: &[ArrayRef], _: &ExecutionProps| {
// in DataFusion, all `args` and output are dynamically-typed arrays, which means that we need to:
// 1. cast the values to the type we want
// 2. perform the computation for every element in the array (using a loop or SIMD) and construct the result
Expand Down
23 changes: 11 additions & 12 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,8 @@ impl ExecutionContext {
&self,
logical_plan: &LogicalPlan,
) -> Result<Arc<dyn ExecutionPlan>> {
let state = self.state.lock().unwrap();
let mut state = self.state.lock().unwrap();
state.execution_props.start_execution();
state
.config
.query_planner
Expand Down Expand Up @@ -746,13 +747,13 @@ impl ExecutionConfig {
}
}

/// Holds per-execution properties and data (such as starting timestamps, etc).
/// An instance of this struct is created each time a [`LogicalPlan`] is prepared for
/// execution (optimized). If the same plan is optimized multiple times, a new
/// `ExecutionProps` is created each time.
/// Holds per-execution properties and data (such as starting timestamps, etc).
/// An instance of this struct is created each time a [`LogicalPlan`] is prepared for
/// execution (optimized). If the same plan is optimized multiple times, a new
/// `ExecutionProps` is created each time.
#[derive(Clone)]
pub struct ExecutionProps {
pub(crate) query_execution_start_time: Option<DateTime<Utc>>,
pub(crate) query_execution_start_time: DateTime<Utc>,
}

/// Execution context for registering data sources and executing queries
Expand All @@ -776,15 +777,13 @@ impl ExecutionProps {
/// Creates a new execution props
pub fn new() -> Self {
ExecutionProps {
query_execution_start_time: None,
query_execution_start_time: chrono::Utc::now(),
}
}

/// Marks the execution of query started
pub fn start_execution(&mut self) -> &Self {
if self.query_execution_start_time.is_none() {
self.query_execution_start_time = Some(chrono::Utc::now());
}
self.query_execution_start_time = chrono::Utc::now();
&*self
}
}
Expand Down Expand Up @@ -2096,7 +2095,7 @@ mod tests {
ctx.register_table("t", test::table_with_sequence(1, 1).unwrap())
.unwrap();

let myfunc = |args: &[ArrayRef]| Ok(Arc::clone(&args[0]));
let myfunc = |args: &[ArrayRef], _: &ExecutionProps| Ok(Arc::clone(&args[0]));
let myfunc = make_scalar_function(myfunc);

ctx.register_udf(create_udf(
Expand Down Expand Up @@ -2376,7 +2375,7 @@ mod tests {
let provider = MemTable::try_new(Arc::new(schema), vec![vec![batch]])?;
ctx.register_table("t", Arc::new(provider))?;

let myfunc = |args: &[ArrayRef]| {
let myfunc = |args: &[ArrayRef], _: &ExecutionProps| {
let l = &args[0]
.as_any()
.downcast_ref::<Int32Array>()
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ mod tests {

// declare the udf
let my_fn: ScalarFunctionImplementation =
Arc::new(|_: &[ColumnarValue]| unimplemented!("my_fn is not implemented"));
Arc::new(|_: &[ColumnarValue], _| unimplemented!("my_fn is not implemented"));

// create and register the udf
ctx.register_udf(create_udf(
Expand Down
5 changes: 2 additions & 3 deletions datafusion/src/optimizer/constant_folding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
use crate::physical_plan::functions::BuiltinScalarFunction;
use crate::scalar::ScalarValue;
use chrono::{DateTime, Utc};

/// Optimizer that simplifies comparison expressions involving boolean literals.
///
Expand Down Expand Up @@ -215,7 +214,6 @@ impl<'a> ExprRewriter for ConstantRewriter<'a> {
} => Expr::Literal(ScalarValue::TimestampNanosecond(Some(
self.execution_props
.query_execution_start_time
.unwrap()
.timestamp_nanos(),
))),
expr => {
Expand All @@ -235,6 +233,7 @@ mod tests {
};

use arrow::datatypes::*;
use chrono::{DateTime, Utc};

fn test_table_scan() -> Result<LogicalPlan> {
let schema = Schema::new(vec![
Expand Down Expand Up @@ -623,7 +622,7 @@ mod tests {
) -> String {
let rule = ConstantFolding::new();
let execution_props = ExecutionProps {
query_execution_start_time: Some(date_time.clone()),
query_execution_start_time: date_time.clone(),
};

let optimized_plan = rule
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/physical_plan/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use arrow::datatypes::DataType;
use std::sync::Arc;

use super::ColumnarValue;
use crate::execution::context::ExecutionProps;

macro_rules! downcast_vec {
($ARGS:expr, $ARRAY_TYPE:ident) => {{
Expand Down Expand Up @@ -90,7 +91,7 @@ fn array_array(args: &[&dyn Array]) -> Result<ArrayRef> {
}

/// put values in an array.
pub fn array(values: &[ColumnarValue]) -> Result<ColumnarValue> {
pub fn array(values: &[ColumnarValue], _: &ExecutionProps) -> Result<ColumnarValue> {
let arrays: Vec<&dyn Array> = values
.iter()
.map(|value| {
Expand Down
11 changes: 6 additions & 5 deletions datafusion/src/physical_plan/crypto_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use arrow::{
};

use super::{string_expressions::unary_string_function, ColumnarValue};
use crate::execution::context::ExecutionProps;

/// Computes the md5 of a string.
fn md5_process(input: &str) -> String {
Expand Down Expand Up @@ -144,7 +145,7 @@ fn md5_array<T: StringOffsetSizeTrait>(
}

/// crypto function that accepts Utf8 or LargeUtf8 and returns a [`ColumnarValue`]
pub fn md5(args: &[ColumnarValue]) -> Result<ColumnarValue> {
pub fn md5(args: &[ColumnarValue], _: &ExecutionProps) -> Result<ColumnarValue> {
match &args[0] {
ColumnarValue::Array(a) => match a.data_type() {
DataType::Utf8 => Ok(ColumnarValue::Array(Arc::new(md5_array::<i32>(&[
Expand Down Expand Up @@ -178,21 +179,21 @@ pub fn md5(args: &[ColumnarValue]) -> Result<ColumnarValue> {
}

/// crypto function that accepts Utf8 or LargeUtf8 and returns a [`ColumnarValue`]
pub fn sha224(args: &[ColumnarValue]) -> Result<ColumnarValue> {
pub fn sha224(args: &[ColumnarValue], _: &ExecutionProps) -> Result<ColumnarValue> {
handle(args, sha_process::<Sha224>, "ssh224")
}

/// crypto function that accepts Utf8 or LargeUtf8 and returns a [`ColumnarValue`]
pub fn sha256(args: &[ColumnarValue]) -> Result<ColumnarValue> {
pub fn sha256(args: &[ColumnarValue], _: &ExecutionProps) -> Result<ColumnarValue> {
handle(args, sha_process::<Sha256>, "sha256")
}

/// crypto function that accepts Utf8 or LargeUtf8 and returns a [`ColumnarValue`]
pub fn sha384(args: &[ColumnarValue]) -> Result<ColumnarValue> {
pub fn sha384(args: &[ColumnarValue], _: &ExecutionProps) -> Result<ColumnarValue> {
handle(args, sha_process::<Sha384>, "sha384")
}

/// crypto function that accepts Utf8 or LargeUtf8 and returns a [`ColumnarValue`]
pub fn sha512(args: &[ColumnarValue]) -> Result<ColumnarValue> {
pub fn sha512(args: &[ColumnarValue], _: &ExecutionProps) -> Result<ColumnarValue> {
handle(args, sha_process::<Sha512>, "sha512")
}
18 changes: 11 additions & 7 deletions datafusion/src/physical_plan/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use std::sync::Arc;

use super::ColumnarValue;
use crate::execution::context::ExecutionProps;
use crate::{
error::{DataFusionError, Result},
scalar::{ScalarType, ScalarValue},
Expand Down Expand Up @@ -260,7 +261,7 @@ where
}

/// to_timestamp SQL function
pub fn to_timestamp(args: &[ColumnarValue]) -> Result<ColumnarValue> {
pub fn to_timestamp(args: &[ColumnarValue], _: &ExecutionProps) -> Result<ColumnarValue> {
handle::<TimestampNanosecondType, _, TimestampNanosecondType>(
args,
string_to_timestamp_nanos,
Expand All @@ -269,9 +270,12 @@ pub fn to_timestamp(args: &[ColumnarValue]) -> Result<ColumnarValue> {
}

/// now SQL function
pub fn now(_: &[ColumnarValue]) -> Result<ColumnarValue> {
pub fn now(
_: &[ColumnarValue],
execution_props: &ExecutionProps,
) -> Result<ColumnarValue> {
Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
Some(chrono::Utc::now().timestamp_nanos()),
Some(execution_props.query_execution_start_time.timestamp_nanos()),
)))
}

Expand Down Expand Up @@ -315,7 +319,7 @@ fn date_trunc_single(granularity: &str, value: i64) -> Result<i64> {
}

/// date_trunc SQL function
pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
pub fn date_trunc(args: &[ColumnarValue], _: &ExecutionProps) -> Result<ColumnarValue> {
let (granularity, array) = (&args[0], &args[1]);

let granularity =
Expand Down Expand Up @@ -404,7 +408,7 @@ macro_rules! extract_date_part {
}

/// DATE_PART SQL function
pub fn date_part(args: &[ColumnarValue]) -> Result<ColumnarValue> {
pub fn date_part(args: &[ColumnarValue], _: &ExecutionProps) -> Result<ColumnarValue> {
if args.len() != 2 {
return Err(DataFusionError::Execution(
"Expected two arguments in DATE_PART".to_string(),
Expand Down Expand Up @@ -470,7 +474,7 @@ mod tests {

let string_array =
ColumnarValue::Array(Arc::new(string_builder.finish()) as ArrayRef);
let parsed_timestamps = to_timestamp(&[string_array])
let parsed_timestamps = to_timestamp(&[string_array], &ExecutionProps::new())
.expect("that to_timestamp parsed values without error");
if let ColumnarValue::Array(parsed_array) = parsed_timestamps {
assert_eq!(parsed_array.len(), 2);
Expand Down Expand Up @@ -550,7 +554,7 @@ mod tests {

let expected_err =
"Internal error: Unsupported data type Int64 for function to_timestamp";
match to_timestamp(&[int64array]) {
match to_timestamp(&[int64array], &ExecutionProps::new()) {
Ok(_) => panic!("Expected error but got success"),
Err(e) => {
assert!(
Expand Down
7 changes: 4 additions & 3 deletions datafusion/src/physical_plan/expressions/nullif.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;

use super::ColumnarValue;
use crate::error::{DataFusionError, Result};
use crate::execution::context::ExecutionProps;
use crate::scalar::ScalarValue;
use arrow::array::Array;
use arrow::array::{
Expand Down Expand Up @@ -71,7 +72,7 @@ macro_rules! primitive_bool_array_op {
/// Args: 0 - left expr is any array
/// 1 - if the left is equal to this expr2, then the result is NULL, otherwise left value is passed.
///
pub fn nullif_func(args: &[ColumnarValue]) -> Result<ColumnarValue> {
pub fn nullif_func(args: &[ColumnarValue], _: &ExecutionProps) -> Result<ColumnarValue> {
if args.len() != 2 {
return Err(DataFusionError::Internal(format!(
"{:?} args were supplied but NULLIF takes exactly two args",
Expand Down Expand Up @@ -142,7 +143,7 @@ mod tests {

let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32)));

let result = nullif_func(&[a, lit_array])?;
let result = nullif_func(&[a, lit_array], &ExecutionProps::new())?;
let result = result.into_array(0);

let expected = Arc::new(Int32Array::from(vec![
Expand All @@ -168,7 +169,7 @@ mod tests {

let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(1i32)));

let result = nullif_func(&[a, lit_array])?;
let result = nullif_func(&[a, lit_array], &ExecutionProps::new())?;
let result = result.into_array(0);

let expected = Arc::new(Int32Array::from(vec![
Expand Down
Loading

0 comments on commit d9cb005

Please sign in to comment.