From 1b79bd5e2d86296684e449a0587b243b518183f1 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Wed, 19 Apr 2023 10:58:46 +0300 Subject: [PATCH 1/6] ready to review --- .../physical-expr/src/expressions/binary.rs | 52 +++++-- .../src/expressions/binary/kernels_arrow.rs | 133 +++++++++++++++++- .../physical-expr/src/expressions/datetime.rs | 116 +++------------ 3 files changed, 189 insertions(+), 112 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 070c12eb6f1f..a9d023f835e8 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -68,14 +68,15 @@ use kernels::{ bitwise_xor, bitwise_xor_scalar, }; use kernels_arrow::{ - add_decimal_dyn_scalar, add_dyn_decimal, divide_decimal_dyn_scalar, - divide_dyn_opt_decimal, is_distinct_from, is_distinct_from_bool, - is_distinct_from_decimal, is_distinct_from_f32, is_distinct_from_f64, - is_distinct_from_null, is_distinct_from_utf8, is_not_distinct_from, - is_not_distinct_from_bool, is_not_distinct_from_decimal, is_not_distinct_from_f32, - is_not_distinct_from_f64, is_not_distinct_from_null, is_not_distinct_from_utf8, - modulus_decimal_dyn_scalar, modulus_dyn_decimal, multiply_decimal_dyn_scalar, - multiply_dyn_decimal, subtract_decimal_dyn_scalar, subtract_dyn_decimal, + add_decimal_dyn_scalar, add_dyn_decimal, add_dyn_temporal, add_dyn_temporal_scalar, + divide_decimal_dyn_scalar, divide_dyn_opt_decimal, is_distinct_from, + is_distinct_from_bool, is_distinct_from_decimal, is_distinct_from_f32, + is_distinct_from_f64, is_distinct_from_null, is_distinct_from_utf8, + is_not_distinct_from, is_not_distinct_from_bool, is_not_distinct_from_decimal, + is_not_distinct_from_f32, is_not_distinct_from_f64, is_not_distinct_from_null, + is_not_distinct_from_utf8, modulus_decimal_dyn_scalar, modulus_dyn_decimal, + multiply_decimal_dyn_scalar, multiply_dyn_decimal, subtract_decimal_dyn_scalar, + subtract_dyn_decimal, subtract_dyn_temporal, subtract_dyn_temporal_scalar, }; use arrow::datatypes::{DataType, Schema, TimeUnit}; @@ -1266,10 +1267,39 @@ macro_rules! sub_timestamp_macro { Arc::new(ret) as ArrayRef }}; } + +pub fn resolve_temporal_op( + lhs: &ArrayRef, + sign: i32, + rhs: &ArrayRef, +) -> Result { + match sign { + 1 => add_dyn_temporal(lhs, rhs), + -1 => subtract_dyn_temporal(lhs, rhs), + other => Err(DataFusionError::Internal(format!( + "Undefined operation for temporal types {other}" + ))), + } +} + +pub fn resolve_temporal_op_scalar( + lhs: &ArrayRef, + sign: i32, + rhs: &ScalarValue, +) -> Result { + match sign { + 1 => add_dyn_temporal_scalar(lhs, rhs), + -1 => subtract_dyn_temporal_scalar(lhs, rhs), + other => Err(DataFusionError::Internal(format!( + "Undefined operation for temporal types {other}" + ))), + } +} + /// This function handles the Timestamp - Timestamp operations, /// where the first one is an array, and the second one is a scalar, /// hence the result is also an array. -pub fn ts_scalar_ts_op(array: ArrayRef, scalar: &ScalarValue) -> Result { +pub fn ts_scalar_ts_op(array: &ArrayRef, scalar: &ScalarValue) -> Result { let ret = match (array.data_type(), scalar) { ( DataType::Timestamp(TimeUnit::Second, opt_tz_lhs), @@ -1364,7 +1394,7 @@ macro_rules! sub_timestamp_interval_macro { /// where the first one is an array, and the second one is a scalar, /// hence the result is also an array. pub fn ts_scalar_interval_op( - array: ArrayRef, + array: &ArrayRef, sign: i32, scalar: &ScalarValue, ) -> Result { @@ -1448,7 +1478,7 @@ macro_rules! sub_interval_cross_macro { /// where the first one is an array, and the second one is a scalar, /// hence the result is also an interval array. pub fn interval_scalar_interval_op( - array: ArrayRef, + array: &ArrayRef, sign: i32, scalar: &ScalarValue, ) -> Result { diff --git a/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs index 836ea93450be..6538cb4186ec 100644 --- a/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs +++ b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs @@ -21,15 +21,22 @@ use arrow::compute::{ add_dyn, add_scalar_dyn, divide_dyn_opt, divide_scalar_dyn, modulus_dyn, modulus_scalar_dyn, multiply_dyn, multiply_scalar_dyn, subtract_dyn, - subtract_scalar_dyn, + subtract_scalar_dyn, try_unary, }; -use arrow::datatypes::Decimal128Type; +use arrow::datatypes::{Date32Type, Date64Type, Decimal128Type}; use arrow::{array::*, datatypes::ArrowNumericType, downcast_dictionary_array}; use arrow_schema::DataType; -use datafusion_common::cast::as_decimal128_array; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::cast::{as_date32_array, as_date64_array, as_decimal128_array}; +use datafusion_common::scalar::{date32_add, date64_add}; +use datafusion_common::{DataFusionError, Result, ScalarValue}; +use datafusion_expr::ColumnarValue; use std::sync::Arc; +use super::{ + interval_array_op, interval_scalar_interval_op, ts_array_op, ts_interval_array_op, + ts_scalar_interval_op, ts_scalar_ts_op, +}; + // Simple (low performance) kernels until optimized kernels are added to arrow // See https://github.com/apache/arrow-rs/issues/960 @@ -270,6 +277,62 @@ pub(crate) fn add_decimal_dyn_scalar(left: &dyn Array, right: i128) -> Result Result { + match (left.data_type(), right.data_type()) { + (DataType::Timestamp(_, _), DataType::Timestamp(_, _)) => { + ts_array_op(left, right) + } + (DataType::Interval(_), DataType::Interval(_)) => { + interval_array_op(left, right, 1) + } + (DataType::Timestamp(_, _), DataType::Interval(_)) => { + ts_interval_array_op(left, 1, right) + } + (DataType::Interval(_), DataType::Timestamp(_, _)) => { + ts_interval_array_op(right, 1, left) + } + (_, _) => { + // fall back to kernels in arrow-rs + Ok(arrow::compute::add_dyn(left, right)?) + } + } +} + +pub(crate) fn add_dyn_temporal_scalar( + left: &ArrayRef, + right: &ScalarValue, +) -> Result { + match (left.data_type(), right.get_datatype()) { + (DataType::Date32, DataType::Interval(_)) => { + let left = as_date32_array(&left)?; + let ret = Arc::new(try_unary::(left, |days| { + Ok(date32_add(days, right, 1)?) + })?) as ArrayRef; + Ok(ColumnarValue::Array(ret)) + } + (DataType::Date64, DataType::Interval(_)) => { + let left = as_date64_array(&left)?; + let ret = Arc::new(try_unary::(left, |ms| { + Ok(date64_add(ms, right, 1)?) + })?) as ArrayRef; + Ok(ColumnarValue::Array(ret)) + } + (DataType::Interval(_), DataType::Interval(_)) => { + interval_scalar_interval_op(left, 1, right) + } + (DataType::Timestamp(_, _), DataType::Interval(_)) => { + ts_scalar_interval_op(left, 1, right) + } + (_, _) => { + // fall back to kernels in arrow-rs + Ok(ColumnarValue::Array(arrow::compute::add_dyn( + left, + &right.to_array(), + )?)) + } + } +} + pub(crate) fn subtract_decimal_dyn_scalar( left: &dyn Array, right: i128, @@ -280,6 +343,68 @@ pub(crate) fn subtract_decimal_dyn_scalar( decimal_array_with_precision_scale(array, precision, scale) } +pub(crate) fn subtract_dyn_temporal( + left: &ArrayRef, + right: &ArrayRef, +) -> Result { + match (left.data_type(), right.data_type()) { + (DataType::Timestamp(_, _), DataType::Timestamp(_, _)) => { + ts_array_op(left, right) + } + (DataType::Interval(_), DataType::Interval(_)) => { + interval_array_op(left, right, -1) + } + (DataType::Timestamp(_, _), DataType::Interval(_)) => { + ts_interval_array_op(left, -1, right) + } + (DataType::Interval(_), DataType::Timestamp(_, _)) => { + ts_interval_array_op(right, -1, left) + } + (_, _) => { + // fall back to kernels in arrow-rs + Ok(arrow::compute::subtract_dyn(left, right)?) + } + } +} + +pub(crate) fn subtract_dyn_temporal_scalar( + left: &ArrayRef, + right: &ScalarValue, +) -> Result { + match (left.data_type(), right.get_datatype()) { + (DataType::Date32, DataType::Interval(_)) => { + let left = as_date32_array(&left)?; + let ret = Arc::new(try_unary::(left, |days| { + Ok(date32_add(days, right, -1)?) + })?) as ArrayRef; + Ok(ColumnarValue::Array(ret)) + } + (DataType::Date64, DataType::Interval(_)) => { + let left = as_date64_array(&left)?; + let ret = Arc::new(try_unary::(left, |ms| { + Ok(date64_add(ms, right, -1)?) + })?) as ArrayRef; + Ok(ColumnarValue::Array(ret)) + } + (DataType::Timestamp(_, _), DataType::Timestamp(_, _)) => { + ts_scalar_ts_op(left, right) + } + (DataType::Interval(_), DataType::Interval(_)) => { + interval_scalar_interval_op(left, -1, right) + } + (DataType::Timestamp(_, _), DataType::Interval(_)) => { + ts_scalar_interval_op(left, -1, right) + } + (_, _) => { + // fall back to kernels in arrow-rs + Ok(ColumnarValue::Array(arrow::compute::subtract_dyn( + left, + &right.to_array(), + )?)) + } + } +} + fn get_precision_scale(left: &dyn Array) -> Result<(u8, i8)> { match left.data_type() { DataType::Decimal128(precision, scale) => Ok((*precision, *scale)), diff --git a/datafusion/physical-expr/src/expressions/datetime.rs b/datafusion/physical-expr/src/expressions/datetime.rs index c2a54beceb1e..a8f05c6c745e 100644 --- a/datafusion/physical-expr/src/expressions/datetime.rs +++ b/datafusion/physical-expr/src/expressions/datetime.rs @@ -19,13 +19,9 @@ use crate::intervals::cp_solver::{propagate_arithmetic, propagate_comparison}; use crate::intervals::{apply_operator, Interval}; use crate::physical_expr::down_cast_any_ref; use crate::PhysicalExpr; -use arrow::array::{Array, ArrayRef}; -use arrow::compute::try_unary; -use arrow::datatypes::{DataType, Date32Type, Date64Type, Schema}; +use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; -use datafusion_common::cast::*; -use datafusion_common::scalar::*; use datafusion_common::Result; use datafusion_common::{DataFusionError, ScalarValue}; use datafusion_expr::type_coercion::binary::coerce_types; @@ -34,10 +30,7 @@ use std::any::Any; use std::fmt::{Display, Formatter}; use std::sync::Arc; -use super::binary::{ - interval_array_op, interval_scalar_interval_op, ts_array_op, ts_interval_array_op, - ts_scalar_interval_op, ts_scalar_ts_op, -}; +use super::binary::{resolve_temporal_op, resolve_temporal_op_scalar}; /// Perform DATE/TIME/TIMESTAMP +/ INTERVAL math #[derive(Debug)] @@ -151,13 +144,18 @@ impl PhysicalExpr for DateTimeIntervalExpr { operand_lhs.sub(&operand_rhs)? })) } - (ColumnarValue::Array(array_lhs), ColumnarValue::Scalar(operand_rhs)) => { - evaluate_temporal_array(array_lhs, sign, &operand_rhs) - } - - (ColumnarValue::Array(array_lhs), ColumnarValue::Array(array_rhs)) => { - evaluate_temporal_arrays(&array_lhs, sign, &array_rhs) + // This function evaluates temporal array vs scalar operations, such as timestamp - timestamp, + // interval + interval, timestamp + interval, and interval + timestamp. It takes two arrays as input + // and an integer sign representing the operation (+1 for addition and -1 for subtraction). + (ColumnarValue::Array(array_lhs), ColumnarValue::Scalar(array_rhs)) => { + resolve_temporal_op_scalar(&array_lhs, sign, &array_rhs) } + // This function evaluates temporal array operations, such as timestamp - timestamp, interval + interval, + // timestamp + interval, and interval + timestamp. It takes two arrays as input and an integer sign representing + // the operation (+1 for addition and -1 for subtraction). + (ColumnarValue::Array(array_lhs), ColumnarValue::Array(array_rhs)) => Ok( + ColumnarValue::Array(resolve_temporal_op(&array_lhs, sign, &array_rhs)?), + ), (_, _) => { let msg = "If RHS of the operation is an array, then LHS also must be"; Err(DataFusionError::Internal(msg.to_string())) @@ -227,82 +225,6 @@ impl PartialEq for DateTimeIntervalExpr { } } -pub fn evaluate_temporal_array( - array: ArrayRef, - sign: i32, - scalar: &ScalarValue, -) -> Result { - match (array.data_type(), scalar.get_datatype()) { - // Date +- Interval - (DataType::Date32, DataType::Interval(_)) => { - let array = as_date32_array(&array)?; - let ret = Arc::new(try_unary::(array, |days| { - Ok(date32_add(days, scalar, sign)?) - })?) as ArrayRef; - Ok(ColumnarValue::Array(ret)) - } - (DataType::Date64, DataType::Interval(_)) => { - let array = as_date64_array(&array)?; - let ret = Arc::new(try_unary::(array, |ms| { - Ok(date64_add(ms, scalar, sign)?) - })?) as ArrayRef; - Ok(ColumnarValue::Array(ret)) - } - // Timestamp - Timestamp - (DataType::Timestamp(_, _), DataType::Timestamp(_, _)) if sign == -1 => { - ts_scalar_ts_op(array, scalar) - } - // Interval +- Interval - (DataType::Interval(_), DataType::Interval(_)) => { - interval_scalar_interval_op(array, sign, scalar) - } - // Timestamp +- Interval - (DataType::Timestamp(_, _), DataType::Interval(_)) => { - ts_scalar_interval_op(array, sign, scalar) - } - (_, _) => Err(DataFusionError::Execution(format!( - "Invalid lhs type for DateIntervalExpr: {}", - array.data_type() - )))?, - } -} - -// This function evaluates temporal array operations, such as timestamp - timestamp, interval + interval, -// timestamp + interval, and interval + timestamp. It takes two arrays as input and an integer sign representing -// the operation (+1 for addition and -1 for subtraction). It returns a ColumnarValue as output, which can hold -// either a scalar or an array. -pub fn evaluate_temporal_arrays( - array_lhs: &ArrayRef, - sign: i32, - array_rhs: &ArrayRef, -) -> Result { - let ret = match (array_lhs.data_type(), array_rhs.data_type()) { - // Timestamp - Timestamp operations, operands of only the same types are supported. - (DataType::Timestamp(_, _), DataType::Timestamp(_, _)) => { - ts_array_op(array_lhs, array_rhs)? - } - // Interval (+ , -) Interval operations - (DataType::Interval(_), DataType::Interval(_)) => { - interval_array_op(array_lhs, array_rhs, sign)? - } - // Timestamp (+ , -) Interval and Interval + Timestamp operations - // Interval - Timestamp operation is not rational hence not supported - (DataType::Timestamp(_, _), DataType::Interval(_)) => { - ts_interval_array_op(array_lhs, sign, array_rhs)? - } - (DataType::Interval(_), DataType::Timestamp(_, _)) if sign == 1 => { - ts_interval_array_op(array_rhs, sign, array_lhs)? - } - (_, _) => Err(DataFusionError::Execution(format!( - "Invalid array types for DateIntervalExpr: {} {} {}", - array_lhs.data_type(), - sign, - array_rhs.data_type() - )))?, - }; - Ok(ColumnarValue::Array(ret)) -} - #[cfg(test)] mod tests { use super::*; @@ -707,7 +629,7 @@ mod tests { } // In this test, ArrayRef of one element arrays is evaluated with some ScalarValues, - // aiming that evaluate_temporal_array function is working properly and shows the same + // aiming that resolve_temporal_op_scalar function is working properly and shows the same // behavior with ScalarValue arithmetic. fn experiment( timestamp_scalar: ScalarValue, @@ -718,7 +640,7 @@ mod tests { // timestamp + interval if let ColumnarValue::Array(res1) = - evaluate_temporal_array(timestamp_array.clone(), 1, &interval_scalar)? + resolve_temporal_op_scalar(×tamp_array, 1, &interval_scalar)? { let res2 = timestamp_scalar.add(&interval_scalar)?.to_array(); assert_eq!( @@ -730,7 +652,7 @@ mod tests { // timestamp - interval if let ColumnarValue::Array(res1) = - evaluate_temporal_array(timestamp_array.clone(), -1, &interval_scalar)? + resolve_temporal_op_scalar(×tamp_array, -1, &interval_scalar)? { let res2 = timestamp_scalar.sub(&interval_scalar)?.to_array(); assert_eq!( @@ -742,7 +664,7 @@ mod tests { // timestamp - timestamp if let ColumnarValue::Array(res1) = - evaluate_temporal_array(timestamp_array.clone(), -1, ×tamp_scalar)? + resolve_temporal_op_scalar(×tamp_array, -1, ×tamp_scalar)? { let res2 = timestamp_scalar.sub(×tamp_scalar)?.to_array(); assert_eq!( @@ -754,7 +676,7 @@ mod tests { // interval - interval if let ColumnarValue::Array(res1) = - evaluate_temporal_array(interval_array.clone(), -1, &interval_scalar)? + resolve_temporal_op_scalar(&interval_array, -1, &interval_scalar)? { let res2 = interval_scalar.sub(&interval_scalar)?.to_array(); assert_eq!( @@ -766,7 +688,7 @@ mod tests { // interval + interval if let ColumnarValue::Array(res1) = - evaluate_temporal_array(interval_array, 1, &interval_scalar)? + resolve_temporal_op_scalar(&interval_array, 1, &interval_scalar)? { let res2 = interval_scalar.add(&interval_scalar)?.to_array(); assert_eq!( From e7662af04be7fbba0334bbc2d28841ad9000948d Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Wed, 19 Apr 2023 13:55:44 +0300 Subject: [PATCH 2/6] clippy fix --- datafusion-cli/Cargo.lock | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index a0a713765396..cdfddbf917b5 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1168,9 +1168,9 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "h2" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66b91535aa35fea1523ad1b86cb6b53c28e0ae566ba4a460f4457e936cad7c6f" +checksum = "17f8a914c2987b688368b5138aa05321db91f4090cf26118185672ad588bce21" dependencies = [ "bytes", "fnv", @@ -1282,9 +1282,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "0.14.25" +version = "0.14.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc5e554ff619822309ffd57d8734d77cd5ce6238bc956f037ea06c58238c9899" +checksum = "ab302d72a6f11a3b910431ff93aae7e773078c769f0a3ef15fb9ec692ed147d4" dependencies = [ "bytes", "futures-channel", @@ -1529,9 +1529,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.3.1" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d59d8c75012853d2e872fb56bc8a2e53718e2cafe1a4c823143141c6d90c322f" +checksum = "3f508063cc7bb32987c71511216bd5a32be15bccb6a80b52df8b9d7f01fc3aa2" [[package]] name = "lock_api" @@ -2151,9 +2151,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.37.11" +version = "0.37.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85597d61f83914ddeba6a47b3b8ffe7365107221c2e557ed94426489fefb5f77" +checksum = "722529a737f5a942fdbac3a46cee213053196737c5eaa3386d52e85b786f2659" dependencies = [ "bitflags", "errno", From ef50aab140543aa38371d374c2f91a5419e06639 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Wed, 19 Apr 2023 14:49:38 +0300 Subject: [PATCH 3/6] clippy fix --- datafusion/physical-expr/src/expressions/datetime.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/datetime.rs b/datafusion/physical-expr/src/expressions/datetime.rs index 5981f97a47e8..240f729ecbaf 100644 --- a/datafusion/physical-expr/src/expressions/datetime.rs +++ b/datafusion/physical-expr/src/expressions/datetime.rs @@ -22,8 +22,7 @@ use crate::PhysicalExpr; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; -use datafusion_common::Result; -use datafusion_common::{DataFusionError, ScalarValue}; +use datafusion_common::{DataFusionError, Result}; use datafusion_expr::type_coercion::binary::coerce_types; use datafusion_expr::{ColumnarValue, Operator}; use std::any::Any; @@ -231,7 +230,7 @@ mod tests { use arrow_array::IntervalMonthDayNanoArray; use chrono::{Duration, NaiveDate}; use datafusion_common::delta::shift_months; - use datafusion_common::{Column, Result, ToDFSchema}; + use datafusion_common::{Column, Result, ScalarValue, ToDFSchema}; use datafusion_expr::Expr; use std::ops::Add; From 6e7311e5693c201243ddf3d008eef2abcfedc390 Mon Sep 17 00:00:00 2001 From: metesynnada <100111937+metesynnada@users.noreply.github.com> Date: Wed, 19 Apr 2023 18:27:08 +0300 Subject: [PATCH 4/6] Simple code refactor --- .../src/expressions/binary/kernels_arrow.rs | 60 ++++++++----------- 1 file changed, 25 insertions(+), 35 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs index 6538cb4186ec..162e51186e57 100644 --- a/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs +++ b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs @@ -279,21 +279,19 @@ pub(crate) fn add_decimal_dyn_scalar(left: &dyn Array, right: i128) -> Result Result { match (left.data_type(), right.data_type()) { - (DataType::Timestamp(_, _), DataType::Timestamp(_, _)) => { - ts_array_op(left, right) - } - (DataType::Interval(_), DataType::Interval(_)) => { + (DataType::Timestamp(..), DataType::Timestamp(..)) => ts_array_op(left, right), + (DataType::Interval(..), DataType::Interval(..)) => { interval_array_op(left, right, 1) } - (DataType::Timestamp(_, _), DataType::Interval(_)) => { + (DataType::Timestamp(..), DataType::Interval(..)) => { ts_interval_array_op(left, 1, right) } - (DataType::Interval(_), DataType::Timestamp(_, _)) => { + (DataType::Interval(..), DataType::Timestamp(..)) => { ts_interval_array_op(right, 1, left) } - (_, _) => { + _ => { // fall back to kernels in arrow-rs - Ok(arrow::compute::add_dyn(left, right)?) + Ok(add_dyn(left, right)?) } } } @@ -303,32 +301,29 @@ pub(crate) fn add_dyn_temporal_scalar( right: &ScalarValue, ) -> Result { match (left.data_type(), right.get_datatype()) { - (DataType::Date32, DataType::Interval(_)) => { + (DataType::Date32, DataType::Interval(..)) => { let left = as_date32_array(&left)?; let ret = Arc::new(try_unary::(left, |days| { Ok(date32_add(days, right, 1)?) })?) as ArrayRef; Ok(ColumnarValue::Array(ret)) } - (DataType::Date64, DataType::Interval(_)) => { + (DataType::Date64, DataType::Interval(..)) => { let left = as_date64_array(&left)?; let ret = Arc::new(try_unary::(left, |ms| { Ok(date64_add(ms, right, 1)?) })?) as ArrayRef; Ok(ColumnarValue::Array(ret)) } - (DataType::Interval(_), DataType::Interval(_)) => { + (DataType::Interval(..), DataType::Interval(..)) => { interval_scalar_interval_op(left, 1, right) } - (DataType::Timestamp(_, _), DataType::Interval(_)) => { + (DataType::Timestamp(..), DataType::Interval(..)) => { ts_scalar_interval_op(left, 1, right) } - (_, _) => { + _ => { // fall back to kernels in arrow-rs - Ok(ColumnarValue::Array(arrow::compute::add_dyn( - left, - &right.to_array(), - )?)) + Ok(ColumnarValue::Array(add_dyn(left, &right.to_array())?)) } } } @@ -348,21 +343,19 @@ pub(crate) fn subtract_dyn_temporal( right: &ArrayRef, ) -> Result { match (left.data_type(), right.data_type()) { - (DataType::Timestamp(_, _), DataType::Timestamp(_, _)) => { - ts_array_op(left, right) - } - (DataType::Interval(_), DataType::Interval(_)) => { + (DataType::Timestamp(..), DataType::Timestamp(..)) => ts_array_op(left, right), + (DataType::Interval(..), DataType::Interval(..)) => { interval_array_op(left, right, -1) } - (DataType::Timestamp(_, _), DataType::Interval(_)) => { + (DataType::Timestamp(..), DataType::Interval(..)) => { ts_interval_array_op(left, -1, right) } - (DataType::Interval(_), DataType::Timestamp(_, _)) => { + (DataType::Interval(..), DataType::Timestamp(..)) => { ts_interval_array_op(right, -1, left) } - (_, _) => { + _ => { // fall back to kernels in arrow-rs - Ok(arrow::compute::subtract_dyn(left, right)?) + Ok(subtract_dyn(left, right)?) } } } @@ -372,35 +365,32 @@ pub(crate) fn subtract_dyn_temporal_scalar( right: &ScalarValue, ) -> Result { match (left.data_type(), right.get_datatype()) { - (DataType::Date32, DataType::Interval(_)) => { + (DataType::Date32, DataType::Interval(..)) => { let left = as_date32_array(&left)?; let ret = Arc::new(try_unary::(left, |days| { Ok(date32_add(days, right, -1)?) })?) as ArrayRef; Ok(ColumnarValue::Array(ret)) } - (DataType::Date64, DataType::Interval(_)) => { + (DataType::Date64, DataType::Interval(..)) => { let left = as_date64_array(&left)?; let ret = Arc::new(try_unary::(left, |ms| { Ok(date64_add(ms, right, -1)?) })?) as ArrayRef; Ok(ColumnarValue::Array(ret)) } - (DataType::Timestamp(_, _), DataType::Timestamp(_, _)) => { + (DataType::Timestamp(..), DataType::Timestamp(..)) => { ts_scalar_ts_op(left, right) } - (DataType::Interval(_), DataType::Interval(_)) => { + (DataType::Interval(..), DataType::Interval(..)) => { interval_scalar_interval_op(left, -1, right) } - (DataType::Timestamp(_, _), DataType::Interval(_)) => { + (DataType::Timestamp(..), DataType::Interval(..)) => { ts_scalar_interval_op(left, -1, right) } - (_, _) => { + _ => { // fall back to kernels in arrow-rs - Ok(ColumnarValue::Array(arrow::compute::subtract_dyn( - left, - &right.to_array(), - )?)) + Ok(ColumnarValue::Array(subtract_dyn(left, &right.to_array())?)) } } } From 84afde7d007b29f5265a19a55fb34e760a9cc98c Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Thu, 20 Apr 2023 13:07:27 +0300 Subject: [PATCH 5/6] fix clippy --- .../physical-expr/src/expressions/binary/kernels_arrow.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs index a1da6e4f5970..3b93d6f79258 100644 --- a/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs +++ b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs @@ -29,8 +29,8 @@ use arrow_schema::DataType; use datafusion_common::cast::{as_date32_array, as_date64_array, as_decimal128_array}; use datafusion_common::scalar::{date32_add, date64_add}; use datafusion_common::{DataFusionError, Result, ScalarValue}; -use datafusion_expr::ColumnarValue; use datafusion_expr::type_coercion::binary::decimal_op_mathematics_type; +use datafusion_expr::ColumnarValue; use datafusion_expr::Operator; use std::sync::Arc; From 67592e5e57c73fc73bb6e0080afc1f4b7b1d7c03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Berkay=20=C5=9Eahin?= <124376117+berkaysynnada@users.noreply.github.com> Date: Sun, 23 Apr 2023 19:50:22 +0300 Subject: [PATCH 6/6] Update datafusion/physical-expr/src/expressions/datetime.rs Co-authored-by: Liang-Chi Hsieh --- datafusion/physical-expr/src/expressions/datetime.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/expressions/datetime.rs b/datafusion/physical-expr/src/expressions/datetime.rs index 240f729ecbaf..6c7da105d910 100644 --- a/datafusion/physical-expr/src/expressions/datetime.rs +++ b/datafusion/physical-expr/src/expressions/datetime.rs @@ -144,7 +144,7 @@ impl PhysicalExpr for DateTimeIntervalExpr { })) } // This function evaluates temporal array vs scalar operations, such as timestamp - timestamp, - // interval + interval, timestamp + interval, and interval + timestamp. It takes two arrays as input + // interval + interval, timestamp + interval, and interval + timestamp. It takes one array and one scalar as input // and an integer sign representing the operation (+1 for addition and -1 for subtraction). (ColumnarValue::Array(array_lhs), ColumnarValue::Scalar(array_rhs)) => { resolve_temporal_op_scalar(&array_lhs, sign, &array_rhs)