From da3fa9c1ec0e1481f425d07225dd5a8d19ba4374 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Fri, 12 Apr 2024 16:12:00 +0800 Subject: [PATCH 1/4] improve ApproxPercentileAccumulator merge api and fix bug --- .../physical-expr/src/aggregate/approx_percentile_cont.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs index 3dbf1679e230..b9e4323744ec 100644 --- a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs +++ b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs @@ -34,7 +34,7 @@ use datafusion_common::{ ScalarValue, }; use datafusion_expr::{Accumulator, ColumnarValue}; -use std::{any::Any, iter, sync::Arc}; +use std::{any::Any, sync::Arc}; /// APPROX_PERCENTILE_CONT aggregate expression #[derive(Debug)] @@ -284,7 +284,9 @@ impl ApproxPercentileAccumulator { } pub(crate) fn merge_digests(&mut self, digests: &[TDigest]) { - self.digest = TDigest::merge_digests(digests); + let mut input_digests = digests.to_vec(); + input_digests.push(self.digest.clone()); + self.digest = TDigest::merge_digests(input_digests.as_slice()); } pub(crate) fn convert_to_float(values: &ArrayRef) -> Result> { @@ -425,7 +427,6 @@ impl Accumulator for ApproxPercentileAccumulator { .collect::>>() .map(|state| TDigest::from_scalar_state(&state)) }) - .chain(iter::once(Ok(self.digest.clone()))) .collect::>>()?; self.merge_digests(&states); From ac7dbd60997e52bc47e519886827cbdbaaf54c13 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Mon, 15 Apr 2024 11:29:23 +0800 Subject: [PATCH 2/4] add test for accumulator merge_digests --- .../src/aggregate/approx_percentile_cont.rs | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs index b9e4323744ec..7e0d800eb0f4 100644 --- a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs +++ b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs @@ -441,3 +441,41 @@ impl Accumulator for ApproxPercentileAccumulator { - std::mem::size_of_val(&self.return_type) } } + +#[cfg(test)] +mod tests { + use crate::aggregate::approx_percentile_cont::ApproxPercentileAccumulator; + use crate::aggregate::tdigest::TDigest; + use arrow_schema::DataType; + use datafusion_common::ScalarValue; + use datafusion_expr::Accumulator; + + #[test] + fn test_combine_approx_percentile_accumulator() { + let mut digests: Vec = Vec::new(); + + // one TDigest with 50_000 values from 1 to 1_000 + for _ in 1..=50 { + let t = TDigest::new(100); + let values: Vec<_> = (1..=1_000).map(f64::from).collect(); + let t = t.merge_unsorted_f64(values); + digests.push(t) + } + + let t1 = TDigest::merge_digests(&digests); + let t2 = TDigest::merge_digests(&digests); + + let mut accumulator = + ApproxPercentileAccumulator::new_with_max_size(0.5, DataType::Float64, 100); + + accumulator.merge_digests(&[t1]); + assert_eq!(accumulator.digest.count(), 50_000.0); + accumulator.merge_digests(&[t2]); + assert_eq!(accumulator.digest.count(), 100_000.0); + + assert_eq!( + accumulator.evaluate().unwrap(), + ScalarValue::Float64(Some(500.0)) + ); + } +} From ed4c1a84c4003e473035976b2abca11761457ba2 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Mon, 15 Apr 2024 13:23:42 +0800 Subject: [PATCH 3/4] fix test --- .../physical-expr/src/aggregate/approx_percentile_cont.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs index 7e0d800eb0f4..5b5395a76ce3 100644 --- a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs +++ b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs @@ -447,8 +447,6 @@ mod tests { use crate::aggregate::approx_percentile_cont::ApproxPercentileAccumulator; use crate::aggregate::tdigest::TDigest; use arrow_schema::DataType; - use datafusion_common::ScalarValue; - use datafusion_expr::Accumulator; #[test] fn test_combine_approx_percentile_accumulator() { @@ -472,10 +470,5 @@ mod tests { assert_eq!(accumulator.digest.count(), 50_000.0); accumulator.merge_digests(&[t2]); assert_eq!(accumulator.digest.count(), 100_000.0); - - assert_eq!( - accumulator.evaluate().unwrap(), - ScalarValue::Float64(Some(500.0)) - ); } } From 0444e7257b266b7b17b05e4be532be110e59342e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 15 Apr 2024 06:49:18 -0400 Subject: [PATCH 4/4] Reduce cloneing in ApproxPercentileAccumulator --- .../physical-expr/src/aggregate/approx_percentile_cont.rs | 5 ++--- datafusion/physical-expr/src/aggregate/tdigest.rs | 5 ++++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs index 5b5395a76ce3..63a4c85f9e80 100644 --- a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs +++ b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs @@ -284,9 +284,8 @@ impl ApproxPercentileAccumulator { } pub(crate) fn merge_digests(&mut self, digests: &[TDigest]) { - let mut input_digests = digests.to_vec(); - input_digests.push(self.digest.clone()); - self.digest = TDigest::merge_digests(input_digests.as_slice()); + let digests = digests.iter().chain(std::iter::once(&self.digest)); + self.digest = TDigest::merge_digests(digests) } pub(crate) fn convert_to_float(values: &ArrayRef) -> Result> { diff --git a/datafusion/physical-expr/src/aggregate/tdigest.rs b/datafusion/physical-expr/src/aggregate/tdigest.rs index 78708df94c25..e3b23b91d0ff 100644 --- a/datafusion/physical-expr/src/aggregate/tdigest.rs +++ b/datafusion/physical-expr/src/aggregate/tdigest.rs @@ -370,7 +370,10 @@ impl TDigest { } // Merge multiple T-Digests - pub(crate) fn merge_digests(digests: &[TDigest]) -> TDigest { + pub(crate) fn merge_digests<'a>( + digests: impl IntoIterator, + ) -> TDigest { + let digests = digests.into_iter().collect::>(); let n_centroids: usize = digests.iter().map(|d| d.centroids.len()).sum(); if n_centroids == 0 { return TDigest::default();