Skip to content

Commit

Permalink
Improve ApproxPercentileAccumulator merge api and fix bug (apache#10056)
Browse files Browse the repository at this point in the history
* improve ApproxPercentileAccumulator merge api and fix bug

* add test for accumulator merge_digests

* fix test

* Reduce cloneing in ApproxPercentileAccumulator

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
2 people authored and Omega359 committed Apr 16, 2024
1 parent d97fc46 commit 670d409
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 4 deletions.
37 changes: 34 additions & 3 deletions datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use datafusion_common::{
ScalarValue,
};
use datafusion_expr::{Accumulator, ColumnarValue};
use std::{any::Any, iter, sync::Arc};
use std::{any::Any, sync::Arc};

/// APPROX_PERCENTILE_CONT aggregate expression
#[derive(Debug)]
Expand Down Expand Up @@ -284,7 +284,8 @@ impl ApproxPercentileAccumulator {
}

pub(crate) fn merge_digests(&mut self, digests: &[TDigest]) {
self.digest = TDigest::merge_digests(digests);
let digests = digests.iter().chain(std::iter::once(&self.digest));
self.digest = TDigest::merge_digests(digests)
}

pub(crate) fn convert_to_float(values: &ArrayRef) -> Result<Vec<f64>> {
Expand Down Expand Up @@ -425,7 +426,6 @@ impl Accumulator for ApproxPercentileAccumulator {
.collect::<Result<Vec<_>>>()
.map(|state| TDigest::from_scalar_state(&state))
})
.chain(iter::once(Ok(self.digest.clone())))
.collect::<Result<Vec<_>>>()?;

self.merge_digests(&states);
Expand All @@ -440,3 +440,34 @@ impl Accumulator for ApproxPercentileAccumulator {
- std::mem::size_of_val(&self.return_type)
}
}

#[cfg(test)]
mod tests {
use crate::aggregate::approx_percentile_cont::ApproxPercentileAccumulator;
use crate::aggregate::tdigest::TDigest;
use arrow_schema::DataType;

#[test]
fn test_combine_approx_percentile_accumulator() {
let mut digests: Vec<TDigest> = Vec::new();

// one TDigest with 50_000 values from 1 to 1_000
for _ in 1..=50 {
let t = TDigest::new(100);
let values: Vec<_> = (1..=1_000).map(f64::from).collect();
let t = t.merge_unsorted_f64(values);
digests.push(t)
}

let t1 = TDigest::merge_digests(&digests);
let t2 = TDigest::merge_digests(&digests);

let mut accumulator =
ApproxPercentileAccumulator::new_with_max_size(0.5, DataType::Float64, 100);

accumulator.merge_digests(&[t1]);
assert_eq!(accumulator.digest.count(), 50_000.0);
accumulator.merge_digests(&[t2]);
assert_eq!(accumulator.digest.count(), 100_000.0);
}
}
5 changes: 4 additions & 1 deletion datafusion/physical-expr/src/aggregate/tdigest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,10 @@ impl TDigest {
}

// Merge multiple T-Digests
pub(crate) fn merge_digests(digests: &[TDigest]) -> TDigest {
pub(crate) fn merge_digests<'a>(
digests: impl IntoIterator<Item = &'a TDigest>,
) -> TDigest {
let digests = digests.into_iter().collect::<Vec<_>>();
let n_centroids: usize = digests.iter().map(|d| d.centroids.len()).sum();
if n_centroids == 0 {
return TDigest::default();
Expand Down

0 comments on commit 670d409

Please sign in to comment.