From 47c86e622a71df6fabc6f8e224a88b4914caa6b9 Mon Sep 17 00:00:00 2001 From: Austin Liu Date: Wed, 28 Aug 2024 16:28:38 +0800 Subject: [PATCH 01/10] Add `distance` aggregation function Signed-off-by: Austin Liu Add `distance` aggregation function Signed-off-by: Austin Liu --- .../functions-aggregate/src/distance.rs | 219 ++++++++++++++++++ datafusion/functions-aggregate/src/lib.rs | 3 + 2 files changed, 222 insertions(+) create mode 100644 datafusion/functions-aggregate/src/distance.rs diff --git a/datafusion/functions-aggregate/src/distance.rs b/datafusion/functions-aggregate/src/distance.rs new file mode 100644 index 000000000000..3f2a1877883a --- /dev/null +++ b/datafusion/functions-aggregate/src/distance.rs @@ -0,0 +1,219 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`Distance`]: Euclidean distance aggregations. + +use std::any::Any; +use std::fmt::Debug; + +use arrow::compute::kernels::cast; +use arrow::compute::{and, filter, is_not_null}; +use arrow::{ + array::{ArrayRef, Float64Array}, + datatypes::{DataType, Field}, +}; + +use datafusion_common::{ + downcast_value, plan_err, unwrap_or_internal_err, DataFusionError, Result, + ScalarValue, +}; +use datafusion_expr::{ + function::{AccumulatorArgs, StateFieldsArgs}, + type_coercion::aggregates::NUMERICS, + utils::format_state_name, + Accumulator, AggregateUDFImpl, Signature, Volatility, +}; + +make_udaf_expr_and_func!( + Distance, + dis, + y x, + "Distance between two numeric values.", + dis_udaf +); + +#[derive(Debug)] +pub struct Distance { + signature: Signature, +} + +impl Default for Distance { + fn default() -> Self { + Self::new() + } +} + +impl Distance { + pub fn new() -> Self { + Self { + signature: Signature::uniform(2, NUMERICS.to_vec(), Volatility::Immutable), + } + } +} + +impl AggregateUDFImpl for Distance { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "distance" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + if !arg_types[0].is_numeric() { + return plan_err!("Distance requires numeric input types"); + } + + Ok(DataType::Float64) + } + fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result> { + Ok(Box::new(DistanceAccumulator::try_new()?)) + } + + fn state_fields(&self, args: StateFieldsArgs) -> Result> { + let name = args.name; + Ok(vec![Field::new( + format_state_name(name, "sum_of_squares"), + DataType::Float64, + true, + )]) + } +} + +/// An accumulator to compute distance of two numeric columns +#[derive(Debug)] +pub struct DistanceAccumulator { + sum_of_squares: f64, +} + +impl DistanceAccumulator { + /// Creates a new `DistanceAccumulator` + pub fn try_new() -> Result { + Ok(Self { + sum_of_squares: 0_f64, + }) + } +} + +impl Accumulator for DistanceAccumulator { + fn state(&mut self) -> Result> { + Ok(vec![ScalarValue::from(self.sum_of_squares)]) + } + + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let values = if values[0].null_count() != 0 || values[1].null_count() != 0 { + let mask = and(&is_not_null(&values[0])?, &is_not_null(&values[1])?)?; + let values1 = filter(&values[0], &mask)?; + let values2 = filter(&values[1], &mask)?; + + vec![values1, values2] + } else { + values.to_vec() + }; + + let values1 = &cast(&values[0], &DataType::Float64)?; + let values2 = &cast(&values[1], &DataType::Float64)?; + + let mut arr1 = downcast_value!(values1, Float64Array).iter().flatten(); + let mut arr2 = downcast_value!(values2, Float64Array).iter().flatten(); + + for i in 0..values1.len() { + let value1 = if values1.is_valid(i) { + arr1.next() + } else { + None + }; + let value2 = if values2.is_valid(i) { + arr2.next() + } else { + None + }; + if value1.is_none() || value2.is_none() { + continue; + } + + let value1 = unwrap_or_internal_err!(value1); + let value2 = unwrap_or_internal_err!(value2); + + self.sum_of_squares += (value1 - value2).powi(2); + } + Ok(()) + } + + fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let values = if values[0].null_count() != 0 || values[1].null_count() != 0 { + let mask = and(&is_not_null(&values[0])?, &is_not_null(&values[1])?)?; + let values1 = filter(&values[0], &mask)?; + let values2 = filter(&values[1], &mask)?; + + vec![values1, values2] + } else { + values.to_vec() + }; + + let values1 = &cast(&values[0], &DataType::Float64)?; + let values2 = &cast(&values[1], &DataType::Float64)?; + let mut arr1 = downcast_value!(values1, Float64Array).iter().flatten(); + let mut arr2 = downcast_value!(values2, Float64Array).iter().flatten(); + + for i in 0..values1.len() { + let value1 = if values1.is_valid(i) { + arr1.next() + } else { + None + }; + let value2 = if values2.is_valid(i) { + arr2.next() + } else { + None + }; + + if value1.is_none() || value2.is_none() { + continue; + } + + let value1 = unwrap_or_internal_err!(value1); + let value2 = unwrap_or_internal_err!(value2); + + let diff = value1 - value2; + self.sum_of_squares -= diff.powi(2); + } + + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + let list_sum_of_squares = downcast_value!(states[0], Float64Array); + for i in 0..list_sum_of_squares.len() { + self.sum_of_squares += list_sum_of_squares.value(i); + } + Ok(()) + } + + fn evaluate(&mut self) -> Result { + Ok(ScalarValue::Float64(Some(self.sum_of_squares.sqrt()))) + } + + fn size(&self) -> usize { + std::mem::size_of_val(self) + } +} diff --git a/datafusion/functions-aggregate/src/lib.rs b/datafusion/functions-aggregate/src/lib.rs index b54cd181a0cb..c4c56f8a11e0 100644 --- a/datafusion/functions-aggregate/src/lib.rs +++ b/datafusion/functions-aggregate/src/lib.rs @@ -62,6 +62,7 @@ pub mod array_agg; pub mod correlation; pub mod count; pub mod covariance; +pub mod distance; pub mod first_last; pub mod hyperloglog; pub mod median; @@ -107,6 +108,7 @@ pub mod expr_fn { pub use super::count::count_distinct; pub use super::covariance::covar_pop; pub use super::covariance::covar_samp; + pub use super::distance::dis; pub use super::first_last::first_value; pub use super::first_last::last_value; pub use super::grouping::grouping; @@ -138,6 +140,7 @@ pub fn all_default_aggregate_functions() -> Vec> { covariance::covar_samp_udaf(), covariance::covar_pop_udaf(), correlation::corr_udaf(), + distance::dis_udaf(), sum::sum_udaf(), min_max::max_udaf(), min_max::min_udaf(), From 2d66d3e7d637df8e37b13211a5f4dcfb3bfaf3d5 Mon Sep 17 00:00:00 2001 From: Austin Liu Date: Wed, 28 Aug 2024 16:34:12 +0800 Subject: [PATCH 02/10] Add sql logic test for `distance` Signed-off-by: Austin Liu --- .../sqllogictest/test_files/aggregate.slt | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 322ddcdb047b..ed5048887a92 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -416,6 +416,50 @@ from data ---- 1 +# csv_query_distance +query R +SELECT distance(c2, c12) FROM aggregate_test_100 +---- +27.565541154252 + +# single_row_query_distance +query R +select distance(sq.column1, sq.column2) from (values (1.1, 2.2)) as sq +---- +1.1 + +# all_nulls_query_distance +query R +with data as ( + select null::int as f, null::int as b + union all + select null::int as f, null::int as b +) +select distance(f, b) +from data +---- +0 + +# distance_query_with_nulls +query R +with data as ( + select 1 as f, 4 as b + union all + select null as f, 99 as b + union all + select 2 as f, 5 as b + union all + select 98 as f, null as b + union all + select 3 as f, 6 as b + union all + select null as f, null as b +) +select distance(f, b) +from data +---- +5.196152422707 + # csv_query_variance_1 query R SELECT var_pop(c2) FROM aggregate_test_100 From 8a27c311c9c95f9d34eb1f0011943518758cb8e6 Mon Sep 17 00:00:00 2001 From: Austin Liu Date: Wed, 28 Aug 2024 16:45:40 +0800 Subject: [PATCH 03/10] Simplify diff calculation Signed-off-by: Austin Liu --- datafusion/functions-aggregate/src/distance.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/functions-aggregate/src/distance.rs b/datafusion/functions-aggregate/src/distance.rs index 3f2a1877883a..ff83b9a26587 100644 --- a/datafusion/functions-aggregate/src/distance.rs +++ b/datafusion/functions-aggregate/src/distance.rs @@ -194,8 +194,7 @@ impl Accumulator for DistanceAccumulator { let value1 = unwrap_or_internal_err!(value1); let value2 = unwrap_or_internal_err!(value2); - let diff = value1 - value2; - self.sum_of_squares -= diff.powi(2); + self.sum_of_squares -= (value1 - value2).powi(2); } Ok(()) From 2c8c0c4b759cd6574a5c4bec3e8fd50b5a267a57 Mon Sep 17 00:00:00 2001 From: Austin Liu Date: Thu, 29 Aug 2024 01:09:16 +0800 Subject: [PATCH 04/10] Add `array_distance`/`list_distance` as list function in functions-nested Signed-off-by: Austin Liu --- datafusion/functions-nested/src/distance.rs | 213 +++++++++++++++++++ datafusion/functions-nested/src/lib.rs | 3 + datafusion/sqllogictest/test_files/array.slt | 54 +++++ 3 files changed, 270 insertions(+) create mode 100644 datafusion/functions-nested/src/distance.rs diff --git a/datafusion/functions-nested/src/distance.rs b/datafusion/functions-nested/src/distance.rs new file mode 100644 index 000000000000..1c631cd7e800 --- /dev/null +++ b/datafusion/functions-nested/src/distance.rs @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [ScalarUDFImpl] definitions for array_distance function. + +use crate::utils::{downcast_arg, make_scalar_function}; +use arrow_array::{ + Array, ArrayRef, ListArray, LargeListArray, OffsetSizeTrait, Float64Array, +}; +use arrow_schema::DataType; +use arrow_schema::DataType::{FixedSizeList, LargeList, List, Float64}; +use core::any::type_name; +use datafusion_common::cast::{as_generic_list_array, as_int32_array, as_int64_array, as_float32_array, as_float64_array}; +use datafusion_common::DataFusionError; +use datafusion_common::{exec_err, plan_err, Result}; +use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; +use std::any::Any; +use std::sync::Arc; + +make_udf_expr_and_func!( + ArrayDistance, + array_distance, + array, + "returns the Euclidean distance between two numeric arrays.", + array_distance_udf +); + +#[derive(Debug)] +pub(super) struct ArrayDistance { + signature: Signature, + aliases: Vec, +} + +impl ArrayDistance { + pub fn new() -> Self { + Self { + signature: Signature::variadic_any(Volatility::Immutable), + aliases: vec!["list_distance".to_string()], + } + } +} + +impl ScalarUDFImpl for ArrayDistance { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "array_distance" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + match arg_types[0] { + List(_) | LargeList(_) | FixedSizeList(_, _) => Ok(Float64), + _ => plan_err!("The array_distance function can only accept List/LargeList/FixedSizeList."), + } + } + + fn invoke(&self, args: &[ColumnarValue]) -> Result { + make_scalar_function(array_distance_inner)(args) + } + + fn aliases(&self) -> &[String] { + &self.aliases + } +} + +pub fn array_distance_inner(args: &[ArrayRef]) -> Result { + if args.len() != 2 { + return exec_err!("array_distance expects exactly two arguments"); + } + + match (&args[0].data_type(), &args[1].data_type()) { + (List(_), List(_)) => { + general_array_distance::(args) + } + (LargeList(_), LargeList(_)) => { + general_array_distance::(args) + } + (array_type1, array_type2) => { + exec_err!("array_distance does not support types '{array_type1:?}' and '{array_type2:?}'") + } + } +} + +fn general_array_distance(arrays: &[ArrayRef]) -> Result { + let list_array1 = as_generic_list_array::(&arrays[0])?; + let list_array2 = as_generic_list_array::(&arrays[1])?; + + let result = list_array1 + .iter() + .zip(list_array2.iter()) + .map(|(arr1, arr2)| compute_array_distance(arr1, arr2)) + .collect::>()?; + + Ok(Arc::new(result) as ArrayRef) +} + +/// Computes the Euclidean distance between two arrays +fn compute_array_distance( + arr1: Option, + arr2: Option, +) -> Result> { + let value1 = match arr1 { + Some(arr) => arr, + None => return Ok(None), + }; + let value2 = match arr2 { + Some(arr) => arr, + None => return Ok(None), + }; + + let mut value1 = value1; + let mut value2 = value2; + + loop { + match value1.data_type() { + List(_) => { + if downcast_arg!(value1, ListArray).null_count() > 0 { + return Ok(None); + } + value1 = downcast_arg!(value1, ListArray).value(0); + } + LargeList(_) => { + if downcast_arg!(value1, LargeListArray).null_count() > 0 { + return Ok(None); + } + value1 = downcast_arg!(value1, LargeListArray).value(0); + } + _ => break, + } + + match value2.data_type() { + List(_) => { + if downcast_arg!(value2, ListArray).null_count() > 0 { + return Ok(None); + } + value2 = downcast_arg!(value2, ListArray).value(0); + } + LargeList(_) => { + if downcast_arg!(value2, LargeListArray).null_count() > 0 { + return Ok(None); + } + value2 = downcast_arg!(value2, LargeListArray).value(0); + } + _ => break, + } + } + + // Check for NULL values inside the arrays + if value1.null_count() != 0 || value2.null_count() != 0 { + return Ok(None); + } + + let values1 = convert_to_f64_array(&value1)?; + let values2 = convert_to_f64_array(&value2)?; + + if values1.len()!= values2.len() { + return exec_err!("Both arrays must have the same length"); + } + + let sum_squares: f64 = values1 + .iter() + .zip(values2.iter()) + .map(|(v1, v2)| { + let diff = v1.unwrap_or(0.0) - v2.unwrap_or(0.0); + diff * diff + }) + .sum(); + + Ok(Some(sum_squares.sqrt())) +} + +/// Converts an array of any numeric type to a Float64Array. +fn convert_to_f64_array(array: &ArrayRef) -> Result { + match array.data_type() { + DataType::Float64 => Ok(as_float64_array(array)?.clone()), + DataType::Float32 => { + let array = as_float32_array(array)?; + let converted: Float64Array = array.iter().map(|v| v.map(|v| v as f64)).collect(); + Ok(converted) + } + DataType::Int64 => { + let array = as_int64_array(array)?; + let converted: Float64Array = array.iter().map(|v| v.map(|v| v as f64)).collect(); + Ok(converted) + } + DataType::Int32 => { + let array = as_int32_array(array)?; + let converted: Float64Array = array.iter().map(|v| v.map(|v| v as f64)).collect(); + Ok(converted) + } + _ => exec_err!("Unsupported array type for conversion to Float64Array"), + } +} diff --git a/datafusion/functions-nested/src/lib.rs b/datafusion/functions-nested/src/lib.rs index ef2c5e709bc1..487baa16b2ec 100644 --- a/datafusion/functions-nested/src/lib.rs +++ b/datafusion/functions-nested/src/lib.rs @@ -34,6 +34,7 @@ pub mod array_has; pub mod cardinality; pub mod concat; pub mod dimension; +pub mod distance; pub mod empty; pub mod except; pub mod expr_ext; @@ -72,6 +73,7 @@ pub mod expr_fn { pub use super::concat::array_prepend; pub use super::dimension::array_dims; pub use super::dimension::array_ndims; + pub use super::distance::array_distance; pub use super::empty::array_empty; pub use super::except::array_except; pub use super::extract::array_element; @@ -126,6 +128,7 @@ pub fn all_default_nested_functions() -> Vec> { array_has::array_has_any_udf(), empty::array_empty_udf(), length::array_length_udf(), + distance::array_distance_udf(), flatten::flatten_udf(), sort::array_sort_udf(), repeat::array_repeat_udf(), diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index b97ecced57e3..1f85a82e6591 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -4715,6 +4715,60 @@ NULL 10 NULL 10 NULL 10 +query RRR +select array_distance([2], [3]), list_distance([1], [2]), list_distance([1], [-2]); +---- +1 1 3 + +query error +select list_distance([1], [1, 2]); + +query R +select array_distance([[1, 1]], [1, 2]); +---- +1 + +query R +select array_distance([[1, 1]], [[1, 2]]); +---- +1 + +query R +select array_distance([[1, 1]], [[1, 2]]); +---- +1 + +query RR +select array_distance([1, 1, 0, 0], [2, 2, 1, 1]), list_distance([1, 2, 3], [1, 2, 3]); +---- +2 0 + +query RR +select array_distance([1.0, 1, 0, 0], [2, 2.0, 1, 1]), list_distance([1, 2.0, 3], [1, 2, 3]); +---- +2 0 + +query R +select list_distance([1, 1, NULL, 0], [2, 2, NULL, NULL]); +---- +NULL + +query R +select list_distance([NULL, NULL], [NULL, NULL]); +---- +NULL + +query R +select list_distance([1.0, 2.0, 3.0], [1.0, 2.0, 3.5]) AS distance; +---- +0.5 + +query R +select list_distance([1, 2, 3], [1, 2, 3]) AS distance; +---- +0 + + ## array_dims (aliases: `list_dims`) # array dims error From b4914548c6b78dd20b818eb1de6c4c9805e4d1c8 Mon Sep 17 00:00:00 2001 From: Austin Liu Date: Thu, 29 Aug 2024 01:11:32 +0800 Subject: [PATCH 05/10] Remove aggregate function `distance` Signed-off-by: Austin Liu --- .../functions-aggregate/src/distance.rs | 218 ------------------ datafusion/functions-aggregate/src/lib.rs | 3 - .../sqllogictest/test_files/aggregate.slt | 44 ---- 3 files changed, 265 deletions(-) delete mode 100644 datafusion/functions-aggregate/src/distance.rs diff --git a/datafusion/functions-aggregate/src/distance.rs b/datafusion/functions-aggregate/src/distance.rs deleted file mode 100644 index ff83b9a26587..000000000000 --- a/datafusion/functions-aggregate/src/distance.rs +++ /dev/null @@ -1,218 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! [`Distance`]: Euclidean distance aggregations. - -use std::any::Any; -use std::fmt::Debug; - -use arrow::compute::kernels::cast; -use arrow::compute::{and, filter, is_not_null}; -use arrow::{ - array::{ArrayRef, Float64Array}, - datatypes::{DataType, Field}, -}; - -use datafusion_common::{ - downcast_value, plan_err, unwrap_or_internal_err, DataFusionError, Result, - ScalarValue, -}; -use datafusion_expr::{ - function::{AccumulatorArgs, StateFieldsArgs}, - type_coercion::aggregates::NUMERICS, - utils::format_state_name, - Accumulator, AggregateUDFImpl, Signature, Volatility, -}; - -make_udaf_expr_and_func!( - Distance, - dis, - y x, - "Distance between two numeric values.", - dis_udaf -); - -#[derive(Debug)] -pub struct Distance { - signature: Signature, -} - -impl Default for Distance { - fn default() -> Self { - Self::new() - } -} - -impl Distance { - pub fn new() -> Self { - Self { - signature: Signature::uniform(2, NUMERICS.to_vec(), Volatility::Immutable), - } - } -} - -impl AggregateUDFImpl for Distance { - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - "distance" - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn return_type(&self, arg_types: &[DataType]) -> Result { - if !arg_types[0].is_numeric() { - return plan_err!("Distance requires numeric input types"); - } - - Ok(DataType::Float64) - } - fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result> { - Ok(Box::new(DistanceAccumulator::try_new()?)) - } - - fn state_fields(&self, args: StateFieldsArgs) -> Result> { - let name = args.name; - Ok(vec![Field::new( - format_state_name(name, "sum_of_squares"), - DataType::Float64, - true, - )]) - } -} - -/// An accumulator to compute distance of two numeric columns -#[derive(Debug)] -pub struct DistanceAccumulator { - sum_of_squares: f64, -} - -impl DistanceAccumulator { - /// Creates a new `DistanceAccumulator` - pub fn try_new() -> Result { - Ok(Self { - sum_of_squares: 0_f64, - }) - } -} - -impl Accumulator for DistanceAccumulator { - fn state(&mut self) -> Result> { - Ok(vec![ScalarValue::from(self.sum_of_squares)]) - } - - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let values = if values[0].null_count() != 0 || values[1].null_count() != 0 { - let mask = and(&is_not_null(&values[0])?, &is_not_null(&values[1])?)?; - let values1 = filter(&values[0], &mask)?; - let values2 = filter(&values[1], &mask)?; - - vec![values1, values2] - } else { - values.to_vec() - }; - - let values1 = &cast(&values[0], &DataType::Float64)?; - let values2 = &cast(&values[1], &DataType::Float64)?; - - let mut arr1 = downcast_value!(values1, Float64Array).iter().flatten(); - let mut arr2 = downcast_value!(values2, Float64Array).iter().flatten(); - - for i in 0..values1.len() { - let value1 = if values1.is_valid(i) { - arr1.next() - } else { - None - }; - let value2 = if values2.is_valid(i) { - arr2.next() - } else { - None - }; - if value1.is_none() || value2.is_none() { - continue; - } - - let value1 = unwrap_or_internal_err!(value1); - let value2 = unwrap_or_internal_err!(value2); - - self.sum_of_squares += (value1 - value2).powi(2); - } - Ok(()) - } - - fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let values = if values[0].null_count() != 0 || values[1].null_count() != 0 { - let mask = and(&is_not_null(&values[0])?, &is_not_null(&values[1])?)?; - let values1 = filter(&values[0], &mask)?; - let values2 = filter(&values[1], &mask)?; - - vec![values1, values2] - } else { - values.to_vec() - }; - - let values1 = &cast(&values[0], &DataType::Float64)?; - let values2 = &cast(&values[1], &DataType::Float64)?; - let mut arr1 = downcast_value!(values1, Float64Array).iter().flatten(); - let mut arr2 = downcast_value!(values2, Float64Array).iter().flatten(); - - for i in 0..values1.len() { - let value1 = if values1.is_valid(i) { - arr1.next() - } else { - None - }; - let value2 = if values2.is_valid(i) { - arr2.next() - } else { - None - }; - - if value1.is_none() || value2.is_none() { - continue; - } - - let value1 = unwrap_or_internal_err!(value1); - let value2 = unwrap_or_internal_err!(value2); - - self.sum_of_squares -= (value1 - value2).powi(2); - } - - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - let list_sum_of_squares = downcast_value!(states[0], Float64Array); - for i in 0..list_sum_of_squares.len() { - self.sum_of_squares += list_sum_of_squares.value(i); - } - Ok(()) - } - - fn evaluate(&mut self) -> Result { - Ok(ScalarValue::Float64(Some(self.sum_of_squares.sqrt()))) - } - - fn size(&self) -> usize { - std::mem::size_of_val(self) - } -} diff --git a/datafusion/functions-aggregate/src/lib.rs b/datafusion/functions-aggregate/src/lib.rs index c4c56f8a11e0..b54cd181a0cb 100644 --- a/datafusion/functions-aggregate/src/lib.rs +++ b/datafusion/functions-aggregate/src/lib.rs @@ -62,7 +62,6 @@ pub mod array_agg; pub mod correlation; pub mod count; pub mod covariance; -pub mod distance; pub mod first_last; pub mod hyperloglog; pub mod median; @@ -108,7 +107,6 @@ pub mod expr_fn { pub use super::count::count_distinct; pub use super::covariance::covar_pop; pub use super::covariance::covar_samp; - pub use super::distance::dis; pub use super::first_last::first_value; pub use super::first_last::last_value; pub use super::grouping::grouping; @@ -140,7 +138,6 @@ pub fn all_default_aggregate_functions() -> Vec> { covariance::covar_samp_udaf(), covariance::covar_pop_udaf(), correlation::corr_udaf(), - distance::dis_udaf(), sum::sum_udaf(), min_max::max_udaf(), min_max::min_udaf(), diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index ed5048887a92..322ddcdb047b 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -416,50 +416,6 @@ from data ---- 1 -# csv_query_distance -query R -SELECT distance(c2, c12) FROM aggregate_test_100 ----- -27.565541154252 - -# single_row_query_distance -query R -select distance(sq.column1, sq.column2) from (values (1.1, 2.2)) as sq ----- -1.1 - -# all_nulls_query_distance -query R -with data as ( - select null::int as f, null::int as b - union all - select null::int as f, null::int as b -) -select distance(f, b) -from data ----- -0 - -# distance_query_with_nulls -query R -with data as ( - select 1 as f, 4 as b - union all - select null as f, 99 as b - union all - select 2 as f, 5 as b - union all - select 98 as f, null as b - union all - select 3 as f, 6 as b - union all - select null as f, null as b -) -select distance(f, b) -from data ----- -5.196152422707 - # csv_query_variance_1 query R SELECT var_pop(c2) FROM aggregate_test_100 From a55617e3b6a2b82169f212c0846fa0d9a8a7aa69 Mon Sep 17 00:00:00 2001 From: Austin Liu Date: Thu, 29 Aug 2024 01:18:28 +0800 Subject: [PATCH 06/10] format Signed-off-by: Austin Liu --- datafusion/functions-nested/src/distance.rs | 36 +++++++++++---------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/datafusion/functions-nested/src/distance.rs b/datafusion/functions-nested/src/distance.rs index 1c631cd7e800..4e188cd4d964 100644 --- a/datafusion/functions-nested/src/distance.rs +++ b/datafusion/functions-nested/src/distance.rs @@ -19,12 +19,15 @@ use crate::utils::{downcast_arg, make_scalar_function}; use arrow_array::{ - Array, ArrayRef, ListArray, LargeListArray, OffsetSizeTrait, Float64Array, + Array, ArrayRef, Float64Array, LargeListArray, ListArray, OffsetSizeTrait, }; use arrow_schema::DataType; -use arrow_schema::DataType::{FixedSizeList, LargeList, List, Float64}; +use arrow_schema::DataType::{FixedSizeList, Float64, LargeList, List}; use core::any::type_name; -use datafusion_common::cast::{as_generic_list_array, as_int32_array, as_int64_array, as_float32_array, as_float64_array}; +use datafusion_common::cast::{ + as_float32_array, as_float64_array, as_generic_list_array, as_int32_array, + as_int64_array, +}; use datafusion_common::DataFusionError; use datafusion_common::{exec_err, plan_err, Result}; use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; @@ -89,12 +92,8 @@ pub fn array_distance_inner(args: &[ArrayRef]) -> Result { } match (&args[0].data_type(), &args[1].data_type()) { - (List(_), List(_)) => { - general_array_distance::(args) - } - (LargeList(_), LargeList(_)) => { - general_array_distance::(args) - } + (List(_), List(_)) => general_array_distance::(args), + (LargeList(_), LargeList(_)) => general_array_distance::(args), (array_type1, array_type2) => { exec_err!("array_distance does not support types '{array_type1:?}' and '{array_type2:?}'") } @@ -173,18 +172,18 @@ fn compute_array_distance( let values1 = convert_to_f64_array(&value1)?; let values2 = convert_to_f64_array(&value2)?; - if values1.len()!= values2.len() { + if values1.len() != values2.len() { return exec_err!("Both arrays must have the same length"); } let sum_squares: f64 = values1 - .iter() - .zip(values2.iter()) - .map(|(v1, v2)| { + .iter() + .zip(values2.iter()) + .map(|(v1, v2)| { let diff = v1.unwrap_or(0.0) - v2.unwrap_or(0.0); diff * diff }) - .sum(); + .sum(); Ok(Some(sum_squares.sqrt())) } @@ -195,17 +194,20 @@ fn convert_to_f64_array(array: &ArrayRef) -> Result { DataType::Float64 => Ok(as_float64_array(array)?.clone()), DataType::Float32 => { let array = as_float32_array(array)?; - let converted: Float64Array = array.iter().map(|v| v.map(|v| v as f64)).collect(); + let converted: Float64Array = + array.iter().map(|v| v.map(|v| v as f64)).collect(); Ok(converted) } DataType::Int64 => { let array = as_int64_array(array)?; - let converted: Float64Array = array.iter().map(|v| v.map(|v| v as f64)).collect(); + let converted: Float64Array = + array.iter().map(|v| v.map(|v| v as f64)).collect(); Ok(converted) } DataType::Int32 => { let array = as_int32_array(array)?; - let converted: Float64Array = array.iter().map(|v| v.map(|v| v as f64)).collect(); + let converted: Float64Array = + array.iter().map(|v| v.map(|v| v as f64)).collect(); Ok(converted) } _ => exec_err!("Unsupported array type for conversion to Float64Array"), From 32d4d8d54e126835cc6065b723b571d967f9c0aa Mon Sep 17 00:00:00 2001 From: Austin Liu Date: Thu, 29 Aug 2024 01:23:24 +0800 Subject: [PATCH 07/10] clean up error handling Signed-off-by: Austin Liu --- datafusion/functions-nested/src/distance.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/functions-nested/src/distance.rs b/datafusion/functions-nested/src/distance.rs index 4e188cd4d964..4d779dc1fabf 100644 --- a/datafusion/functions-nested/src/distance.rs +++ b/datafusion/functions-nested/src/distance.rs @@ -29,7 +29,7 @@ use datafusion_common::cast::{ as_int64_array, }; use datafusion_common::DataFusionError; -use datafusion_common::{exec_err, plan_err, Result}; +use datafusion_common::{exec_err, Result}; use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; use std::any::Any; use std::sync::Arc; @@ -73,7 +73,7 @@ impl ScalarUDFImpl for ArrayDistance { fn return_type(&self, arg_types: &[DataType]) -> Result { match arg_types[0] { List(_) | LargeList(_) | FixedSizeList(_, _) => Ok(Float64), - _ => plan_err!("The array_distance function can only accept List/LargeList/FixedSizeList."), + _ => exec_err!("The array_distance function can only accept List/LargeList/FixedSizeList."), } } From 4052cb083b159d46ce100f1ca33234ebd97078c4 Mon Sep 17 00:00:00 2001 From: Austin Liu Date: Thu, 29 Aug 2024 11:08:47 +0800 Subject: [PATCH 08/10] Add `array_distance` in scalar array functions docs Signed-off-by: Austin Liu --- .../source/user-guide/sql/scalar_functions.md | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/docs/source/user-guide/sql/scalar_functions.md b/docs/source/user-guide/sql/scalar_functions.md index c7490df04983..808fe30eacd5 100644 --- a/docs/source/user-guide/sql/scalar_functions.md +++ b/docs/source/user-guide/sql/scalar_functions.md @@ -2093,6 +2093,7 @@ to_unixtime(expression[, ..., format_n]) - [array_concat](#array_concat) - [array_contains](#array_contains) - [array_dims](#array_dims) +- [array_distance](#array_distance) - [array_distinct](#array_distinct) - [array_has](#array_has) - [array_has_all](#array_has_all) @@ -2388,6 +2389,36 @@ array_dims(array) - list_dims +### `array_distance` + +Returns the Euclidean distance between two input arrays of equal length. + +``` +array_distance(array1, array2) +``` + +#### Arguments + +- **array1**: Array expression. + Can be a constant, column, or function, and any combination of array operators. +- **array2**: Array expression. + Can be a constant, column, or function, and any combination of array operators. + +#### Example + +``` +> select array_distance([1, 2], [1, 4]); ++---------------------------------+ +| array_distance(List([1,2], [1,4])) | ++---------------------------------+ +| 2.0 | ++---------------------------------+ +``` + +#### Aliases + +- list_distance + ### `array_distinct` Returns distinct values from the array after removing duplicates. @@ -3224,6 +3255,10 @@ _Alias of [array_concat](#array_concat)._ _Alias of [array_dims](#array_dims)._ +### `list_distance` + +_Alias of [array_distance](#array_distance)._ + ### `list_distinct` _Alias of [array_dims](#array_distinct)._ From eae4ad5632ceb452bcb4dadc6a6099a528da5c66 Mon Sep 17 00:00:00 2001 From: Austin Liu Date: Thu, 29 Aug 2024 12:16:11 +0800 Subject: [PATCH 09/10] Update bulletin Signed-off-by: Austin Liu --- docs/source/user-guide/sql/scalar_functions.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/user-guide/sql/scalar_functions.md b/docs/source/user-guide/sql/scalar_functions.md index 808fe30eacd5..41ba3b60102c 100644 --- a/docs/source/user-guide/sql/scalar_functions.md +++ b/docs/source/user-guide/sql/scalar_functions.md @@ -2136,6 +2136,7 @@ to_unixtime(expression[, ..., format_n]) - [list_cat](#list_cat) - [list_concat](#list_concat) - [list_dims](#list_dims) +- [list_distance](#list_distance) - [list_distinct](#list_distinct) - [list_element](#list_element) - [list_except](#list_except) From 46ae58c1164622e16ca5f2c6fb69198f71e5f084 Mon Sep 17 00:00:00 2001 From: Austin Liu Date: Thu, 29 Aug 2024 12:20:21 +0800 Subject: [PATCH 10/10] Prettify example Signed-off-by: Austin Liu --- docs/source/user-guide/sql/scalar_functions.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/source/user-guide/sql/scalar_functions.md b/docs/source/user-guide/sql/scalar_functions.md index 41ba3b60102c..f631114feabb 100644 --- a/docs/source/user-guide/sql/scalar_functions.md +++ b/docs/source/user-guide/sql/scalar_functions.md @@ -2409,11 +2409,11 @@ array_distance(array1, array2) ``` > select array_distance([1, 2], [1, 4]); -+---------------------------------+ ++------------------------------------+ | array_distance(List([1,2], [1,4])) | -+---------------------------------+ -| 2.0 | -+---------------------------------+ ++------------------------------------+ +| 2.0 | ++------------------------------------+ ``` #### Aliases