Skip to content

Commit

Permalink
Removed last usages of scalar_inputs, scalar_input_types and inputs2 …
Browse files Browse the repository at this point in the history
…to use arrow unary/binary for performance (#12972)

* removed last uses of make_function_scalar_inputs

* delete make_function_scalar_inputs

* fix

* refactored other macros

* fix unary CI

* fix base f32/f64 mismatch not caught by tests

* import order changes

* Update log.rs

* stylistic changes

---------

Co-authored-by: berkaysynnada <[email protected]>
  • Loading branch information
buraksenn and berkaysynnada authored Oct 21, 2024
1 parent 69a4648 commit edeca39
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 221 deletions.
137 changes: 36 additions & 101 deletions datafusion/functions/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,26 +112,6 @@ macro_rules! make_stub_package {
};
}

/// Invokes a function on each element of an array and returns the result as a new array
///
/// $ARG: ArrayRef
/// $NAME: name of the function (for error messages)
/// $ARGS_TYPE: the type of array to cast the argument to
/// $RETURN_TYPE: the type of array to return
/// $FUNC: the function to apply to each element of $ARG
macro_rules! make_function_scalar_inputs_return_type {
($ARG: expr, $NAME:expr, $ARG_TYPE:ident, $RETURN_TYPE:ident, $FUNC: block) => {{
let arg = downcast_arg!($ARG, $NAME, $ARG_TYPE);

arg.iter()
.map(|a| match a {
Some(a) => Some($FUNC(a)),
_ => None,
})
.collect::<$RETURN_TYPE>()
}};
}

/// Downcast an argument to a specific array type, returning an internal error
/// if the cast fails
///
Expand Down Expand Up @@ -168,9 +148,9 @@ macro_rules! make_math_unary_udf {
use std::any::Any;
use std::sync::Arc;

use arrow::array::{ArrayRef, Float32Array, Float64Array};
use arrow::datatypes::DataType;
use datafusion_common::{exec_err, DataFusionError, Result};
use arrow::array::{ArrayRef, AsArray};
use arrow::datatypes::{DataType, Float32Type, Float64Type};
use datafusion_common::{exec_err, Result};
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::{
Expand Down Expand Up @@ -231,24 +211,16 @@ macro_rules! make_math_unary_udf {
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
let arr: ArrayRef = match args[0].data_type() {
DataType::Float64 => {
Arc::new(make_function_scalar_inputs_return_type!(
&args[0],
self.name(),
Float64Array,
Float64Array,
{ f64::$UNARY_FUNC }
))
}
DataType::Float32 => {
Arc::new(make_function_scalar_inputs_return_type!(
&args[0],
self.name(),
Float32Array,
Float32Array,
{ f32::$UNARY_FUNC }
))
}
DataType::Float64 => Arc::new(
args[0]
.as_primitive::<Float64Type>()
.unary::<_, Float64Type>(|x: f64| f64::$UNARY_FUNC(x)),
) as ArrayRef,
DataType::Float32 => Arc::new(
args[0]
.as_primitive::<Float32Type>()
.unary::<_, Float32Type>(|x: f32| f32::$UNARY_FUNC(x)),
) as ArrayRef,
other => {
return exec_err!(
"Unsupported data type {other:?} for function {}",
Expand Down Expand Up @@ -286,9 +258,9 @@ macro_rules! make_math_binary_udf {
use std::any::Any;
use std::sync::Arc;

use arrow::array::{ArrayRef, Float32Array, Float64Array};
use arrow::datatypes::DataType;
use datafusion_common::{exec_err, DataFusionError, Result};
use arrow::array::{ArrayRef, AsArray};
use arrow::datatypes::{DataType, Float32Type, Float64Type};
use datafusion_common::{exec_err, Result};
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::TypeSignature;
use datafusion_expr::{
Expand Down Expand Up @@ -347,23 +319,26 @@ macro_rules! make_math_binary_udf {
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
let arr: ArrayRef = match args[0].data_type() {
DataType::Float64 => Arc::new(make_function_inputs2!(
&args[0],
&args[1],
"y",
"x",
Float64Array,
{ f64::$BINARY_FUNC }
)),

DataType::Float32 => Arc::new(make_function_inputs2!(
&args[0],
&args[1],
"y",
"x",
Float32Array,
{ f32::$BINARY_FUNC }
)),
DataType::Float64 => {
let y = args[0].as_primitive::<Float64Type>();
let x = args[1].as_primitive::<Float64Type>();
let result = arrow::compute::binary::<_, _, _, Float64Type>(
y,
x,
|y, x| f64::$BINARY_FUNC(y, x),
)?;
Arc::new(result) as _
}
DataType::Float32 => {
let y = args[0].as_primitive::<Float32Type>();
let x = args[1].as_primitive::<Float32Type>();
let result = arrow::compute::binary::<_, _, _, Float32Type>(
y,
x,
|y, x| f32::$BINARY_FUNC(y, x),
)?;
Arc::new(result) as _
}
other => {
return exec_err!(
"Unsupported data type {other:?} for function {}",
Expand All @@ -382,43 +357,3 @@ macro_rules! make_math_binary_udf {
}
};
}

macro_rules! make_function_scalar_inputs {
($ARG: expr, $NAME:expr, $ARRAY_TYPE:ident, $FUNC: block) => {{
let arg = downcast_arg!($ARG, $NAME, $ARRAY_TYPE);

arg.iter()
.map(|a| match a {
Some(a) => Some($FUNC(a)),
_ => None,
})
.collect::<$ARRAY_TYPE>()
}};
}

macro_rules! make_function_inputs2 {
($ARG1: expr, $ARG2: expr, $NAME1:expr, $NAME2: expr, $ARRAY_TYPE:ident, $FUNC: block) => {{
let arg1 = downcast_arg!($ARG1, $NAME1, $ARRAY_TYPE);
let arg2 = downcast_arg!($ARG2, $NAME2, $ARRAY_TYPE);

arg1.iter()
.zip(arg2.iter())
.map(|(a1, a2)| match (a1, a2) {
(Some(a1), Some(a2)) => Some($FUNC(a1, a2.try_into().ok()?)),
_ => None,
})
.collect::<$ARRAY_TYPE>()
}};
($ARG1: expr, $ARG2: expr, $NAME1:expr, $NAME2: expr, $ARRAY_TYPE1:ident, $ARRAY_TYPE2:ident, $FUNC: block) => {{
let arg1 = downcast_arg!($ARG1, $NAME1, $ARRAY_TYPE1);
let arg2 = downcast_arg!($ARG2, $NAME2, $ARRAY_TYPE2);

arg1.iter()
.zip(arg2.iter())
.map(|(a1, a2)| match (a1, a2) {
(Some(a1), Some(a2)) => Some($FUNC(a1, a2.try_into().ok()?)),
_ => None,
})
.collect::<$ARRAY_TYPE1>()
}};
}
57 changes: 30 additions & 27 deletions datafusion/functions/src/math/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ use std::sync::{Arc, OnceLock};

use super::power::PowerFunc;

use arrow::array::{ArrayRef, Float32Array, Float64Array};
use arrow::datatypes::DataType;
use arrow::array::{ArrayRef, AsArray};
use arrow::datatypes::{DataType, Float32Type, Float64Type};
use datafusion_common::{
exec_err, internal_err, plan_datafusion_err, plan_err, DataFusionError, Result,
ScalarValue,
exec_err, internal_err, plan_datafusion_err, plan_err, Result, ScalarValue,
};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::scalar_doc_sections::DOC_SECTION_MATH;
Expand Down Expand Up @@ -140,37 +139,40 @@ impl ScalarUDFImpl for LogFunc {
let arr: ArrayRef = match args[0].data_type() {
DataType::Float64 => match base {
ColumnarValue::Scalar(ScalarValue::Float32(Some(base))) => {
Arc::new(make_function_scalar_inputs!(x, "x", Float64Array, {
|value: f64| f64::log(value, base as f64)
}))
Arc::new(x.as_primitive::<Float64Type>().unary::<_, Float64Type>(
|value: f64| f64::log(value, base as f64),
))
}
ColumnarValue::Array(base) => {
let x = x.as_primitive::<Float64Type>();
let base = base.as_primitive::<Float64Type>();
let result = arrow::compute::binary::<_, _, _, Float64Type>(
x,
base,
f64::log,
)?;
Arc::new(result) as _
}
ColumnarValue::Array(base) => Arc::new(make_function_inputs2!(
x,
base,
"x",
"base",
Float64Array,
{ f64::log }
)),
_ => {
return exec_err!("log function requires a scalar or array for base")
}
},

DataType::Float32 => match base {
ColumnarValue::Scalar(ScalarValue::Float32(Some(base))) => {
Arc::new(make_function_scalar_inputs!(x, "x", Float32Array, {
|value: f32| f32::log(value, base)
}))
ColumnarValue::Scalar(ScalarValue::Float32(Some(base))) => Arc::new(
x.as_primitive::<Float32Type>()
.unary::<_, Float32Type>(|value: f32| f32::log(value, base)),
),
ColumnarValue::Array(base) => {
let x = x.as_primitive::<Float32Type>();
let base = base.as_primitive::<Float32Type>();
let result = arrow::compute::binary::<_, _, _, Float32Type>(
x,
base,
f32::log,
)?;
Arc::new(result) as _
}
ColumnarValue::Array(base) => Arc::new(make_function_inputs2!(
x,
base,
"x",
"base",
Float32Array,
{ f32::log }
)),
_ => {
return exec_err!("log function requires a scalar or array for base")
}
Expand Down Expand Up @@ -259,6 +261,7 @@ mod tests {

use super::*;

use arrow::array::{Float32Array, Float64Array};
use arrow::compute::SortOptions;
use datafusion_common::cast::{as_float32_array, as_float64_array};
use datafusion_common::DFSchema;
Expand Down
39 changes: 17 additions & 22 deletions datafusion/functions/src/math/nanvl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,18 @@
use std::any::Any;
use std::sync::{Arc, OnceLock};

use arrow::array::{ArrayRef, Float32Array, Float64Array};
use arrow::datatypes::DataType;
use arrow::datatypes::DataType::{Float32, Float64};
use crate::utils::make_scalar_function;

use arrow::array::{ArrayRef, AsArray, Float32Array, Float64Array};
use arrow::datatypes::DataType::{Float32, Float64};
use arrow::datatypes::{DataType, Float32Type, Float64Type};
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_expr::scalar_doc_sections::DOC_SECTION_MATH;
use datafusion_expr::TypeSignature::Exact;
use datafusion_expr::{
ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
};

use crate::utils::make_scalar_function;

#[derive(Debug)]
pub struct NanvlFunc {
signature: Signature,
Expand Down Expand Up @@ -113,14 +112,11 @@ fn nanvl(args: &[ArrayRef]) -> Result<ArrayRef> {
}
};

Ok(Arc::new(make_function_inputs2!(
&args[0],
&args[1],
"x",
"y",
Float64Array,
{ compute_nanvl }
)) as ArrayRef)
let x = args[0].as_primitive() as &Float64Array;
let y = args[1].as_primitive() as &Float64Array;
arrow::compute::binary::<_, _, _, Float64Type>(x, y, compute_nanvl)
.map(|res| Arc::new(res) as _)
.map_err(DataFusionError::from)
}
Float32 => {
let compute_nanvl = |x: f32, y: f32| {
Expand All @@ -131,25 +127,24 @@ fn nanvl(args: &[ArrayRef]) -> Result<ArrayRef> {
}
};

Ok(Arc::new(make_function_inputs2!(
&args[0],
&args[1],
"x",
"y",
Float32Array,
{ compute_nanvl }
)) as ArrayRef)
let x = args[0].as_primitive() as &Float32Array;
let y = args[1].as_primitive() as &Float32Array;
arrow::compute::binary::<_, _, _, Float32Type>(x, y, compute_nanvl)
.map(|res| Arc::new(res) as _)
.map_err(DataFusionError::from)
}
other => exec_err!("Unsupported data type {other:?} for function nanvl"),
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;

use crate::math::nanvl::nanvl;

use arrow::array::{ArrayRef, Float32Array, Float64Array};
use datafusion_common::cast::{as_float32_array, as_float64_array};
use std::sync::Arc;

#[test]
fn test_nanvl_f64() {
Expand Down
Loading

0 comments on commit edeca39

Please sign in to comment.