Skip to content

Commit

Permalink
refactor: rebase onto main
Browse files Browse the repository at this point in the history
  • Loading branch information
domodwyer committed Jan 27, 2022
1 parent e8f8e3f commit faa8094
Showing 1 changed file with 20 additions and 29 deletions.
49 changes: 20 additions & 29 deletions datafusion/src/physical_plan/expressions/approx_quantile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::{any::Any, sync::Arc};
use std::{any::Any, iter, sync::Arc};

use arrow::{
array::{
Expand Down Expand Up @@ -151,9 +151,7 @@ impl AggregateExpr for ApproxQuantile {

fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
let accumulator: Box<dyn Accumulator> = match &self.input_data_type {
t
@
(DataType::UInt8
t @ (DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
Expand Down Expand Up @@ -202,17 +200,6 @@ impl Accumulator for ApproxQuantileAccumulator {
Ok(self.digest.to_scalar_state())
}

fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
debug_assert_eq!(
values.len(),
1,
"invalid number of values in quantile update"
);

self.digest = self.digest.merge_unsorted([values[0].clone()])?;
Ok(())
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
debug_assert_eq!(
values.len(),
Expand Down Expand Up @@ -273,19 +260,6 @@ impl Accumulator for ApproxQuantileAccumulator {
Ok(())
}

fn merge(&mut self, states: &[ScalarValue]) -> Result<()> {
debug_assert_eq!(
states.len(),
6,
"invalid number of state fields for quantile accumulator"
);

let other = TDigest::from_scalar_state(states);
self.digest = TDigest::merge_digests(&[self.digest.clone(), other]);

Ok(())
}

fn evaluate(&self) -> Result<ScalarValue> {
let q = self.digest.estimate_quantile(self.quantile);

Expand All @@ -307,6 +281,23 @@ impl Accumulator for ApproxQuantileAccumulator {
}

fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
todo!()
if states.is_empty() {
return Ok(());
};

let states = (0..states[0].len())
.map(|index| {
states
.iter()
.map(|array| ScalarValue::try_from_array(array, index))
.collect::<Result<Vec<_>>>()
.map(|state| TDigest::from_scalar_state(&state))
})
.chain(iter::once(Ok(self.digest.clone())))
.collect::<Result<Vec<_>>>()?;

self.digest = TDigest::merge_digests(&states);

Ok(())
}
}

0 comments on commit faa8094

Please sign in to comment.