Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
Signed-off-by: jayzhan211 <[email protected]>
  • Loading branch information
jayzhan211 committed Jan 29, 2024
1 parent 1a40084 commit 98afb20
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 87 deletions.
65 changes: 30 additions & 35 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ use arrow::{
},
};
use arrow_array::cast::as_list_array;
use arrow_array::types::ArrowTimestampType;
use arrow_array::{ArrowNativeTypeOp, Scalar};
use arrow_buffer::{Buffer, NullBuffer};

Expand Down Expand Up @@ -376,31 +375,7 @@ impl PartialOrd for ScalarValue {
}
(List(_), _) | (LargeList(_), _) | (FixedSizeList(_), _) => None,
(Struct(struct_arr1), Struct(struct_arr2)) => {
if struct_arr1.len() != struct_arr2.len() {
return None;
}

if struct_arr1.data_type() != struct_arr2.data_type() {
return None;
}

for col_index in 0..struct_arr1.num_columns() {
let arr1 = struct_arr1.column(col_index);
let arr2 = struct_arr2.column(col_index);

let lt_res = arrow::compute::kernels::cmp::lt(arr1, arr2).ok()?;
let eq_res = arrow::compute::kernels::cmp::eq(arr1, arr2).ok()?;

for j in 0..lt_res.len() {
if lt_res.is_valid(j) && lt_res.value(j) {
return Some(Ordering::Less);
}
if eq_res.is_valid(j) && !eq_res.value(j) {
return Some(Ordering::Greater);
}
}
}
Some(Ordering::Equal)
partial_cmp_struct(struct_arr1, struct_arr2)
}
(Struct(_), _) => None,
(Date32(v1), Date32(v2)) => v1.partial_cmp(v2),
Expand Down Expand Up @@ -496,6 +471,34 @@ fn partial_cmp_list(arr1: &dyn Array, arr2: &dyn Array) -> Option<Ordering> {
Some(Ordering::Equal)
}

fn partial_cmp_struct(s1: &Arc<StructArray>, s2: &Arc<StructArray>) -> Option<Ordering> {
if s1.len() != s2.len() {
return None;
}

if s1.data_type() != s2.data_type() {
return None;
}

for col_index in 0..s1.num_columns() {
let arr1 = s1.column(col_index);
let arr2 = s2.column(col_index);

let lt_res = arrow::compute::kernels::cmp::lt(arr1, arr2).ok()?;
let eq_res = arrow::compute::kernels::cmp::eq(arr1, arr2).ok()?;

for j in 0..lt_res.len() {
if lt_res.is_valid(j) && lt_res.value(j) {
return Some(Ordering::Less);
}
if eq_res.is_valid(j) && !eq_res.value(j) {
return Some(Ordering::Greater);
}
}
}
Some(Ordering::Equal)
}

impl Eq for ScalarValue {}

//Float wrapper over f32/f64. Just because we cannot build std::hash::Hash for floats directly we have to do it through type wrapper
Expand Down Expand Up @@ -3324,21 +3327,13 @@ mod tests {
use crate::cast::{as_string_array, as_uint32_array, as_uint64_array};

use arrow::buffer::OffsetBuffer;
use arrow::compute::{concat, is_null, kernels};
use arrow::compute::{is_null, kernels};
use arrow::datatypes::{ArrowNumericType, ArrowPrimitiveType};
use arrow::util::pretty::pretty_format_columns;
use arrow_buffer::Buffer;
use chrono::NaiveDate;
use rand::Rng;

use arrow::compute::kernels;
use arrow::datatypes::ArrowPrimitiveType;
use arrow::{buffer::OffsetBuffer, compute::is_null};
use arrow_array::ArrowNumericType;

use chrono::NaiveDate;
use rand::Rng;

#[test]
fn test_scalar_value_from_for_struct() {
let boolean = Arc::new(BooleanArray::from(vec![false]));
Expand Down
51 changes: 15 additions & 36 deletions datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,9 @@ use crate::{
use arrow::array::{Array, ArrayRef};
use arrow::datatypes::{DataType, Field};
use arrow_array::cast::AsArray;
use arrow_array::Array;
use arrow_array::{new_empty_array, StructArray};
use arrow_schema::{Fields, SortOptions};

use datafusion_common::cast::as_list_array;
use datafusion_common::internal_err;
use datafusion_common::utils::array_into_list_array;
use datafusion_common::utils::{compare_rows, get_row_at_idx};
use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue};
Expand Down Expand Up @@ -245,22 +242,17 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
let mut partition_ordering_values = vec![];

// Existing values should be merged also.
partition_values.push(self.values.clone());
partition_ordering_values.push(self.ordering_values.clone());
partition_values.push(self.values.clone().into());
partition_ordering_values.push(self.ordering_values.clone().into());

// Convert array to Scalars to sort them easily. Convert back to array at evaluation.
let array_agg_res = ScalarValue::convert_array_to_scalar_vec(array_agg_values)?;
partition_values.extend(array_agg_res);
for v in array_agg_res.into_iter() {
partition_values.push(v.into());
}

let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)?;

for partition_ordering_rows in orderings.into_iter() {
// Extract value from struct to ordering_rows for each group/partition
let ordering_value = partition_ordering_rows.into_iter().map(|ordering_row| {
if let ScalarValue::Struct(s) = ordering_row {
let mut ordering_columns_per_row = vec![];

let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)?;
for partition_ordering_rows in orderings.into_iter() {
// Extract value from struct to ordering_rows for each group/partition
let ordering_value = partition_ordering_rows.into_iter().map(|ordering_row| {
Expand All @@ -274,13 +266,13 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {

Ok(ordering_columns_per_row)
} else {
internal_err!(
exec_err!(
"Expects to receive ScalarValue::Struct(Arc<StructArray>) but got:{:?}",
ordering_row.data_type()
)
}
}).collect::<Result<Vec<_>>>()?;
// }).collect::<Result<Vec<_>>>()?;
}).collect::<Result<VecDeque<_>>>()?;

partition_ordering_values.push(ordering_value);
}

Expand All @@ -289,13 +281,12 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
.iter()
.map(|sort_expr| sort_expr.options)
.collect::<Vec<_>>();
let (new_values, new_orderings) = merge_ordered_arrays(
&partition_values,
&partition_ordering_values,

(self.values, self.ordering_values) = merge_ordered_arrays(
&mut partition_values,
&mut partition_ordering_values,
&sort_options,
)?;
self.values = new_values;
self.ordering_values = new_orderings;

Ok(())
}
Expand All @@ -314,10 +305,6 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
ScalarValue::new_list_from_iter(values.into_iter(), &self.datatypes[0])
};
Ok(ScalarValue::List(array))
// Ok(ScalarValue::List(ScalarValue::new_list(
// &self.values,
// &self.datatypes[0],
// )))
}

fn size(&self) -> usize {
Expand Down Expand Up @@ -348,18 +335,10 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
impl OrderSensitiveArrayAggAccumulator {
fn evaluate_orderings(&self) -> Result<ScalarValue> {
let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]);
let struct_field = Fields::from(fields);

// let orderings: Vec<ScalarValue> = self
// .ordering_values
// .iter()
// .map(|ordering| {
// ScalarValue::Struct(Some(ordering.clone()), struct_field.clone())
// })
// .collect();
// let struct_type = DataType::Struct(struct_field);
let mut column_wise_ordering_values = vec![];
let num_columns = fields.len();
let struct_field = Fields::from(fields.clone());

let mut column_wise_ordering_values = vec![];
for i in 0..num_columns {
let column_values = self
.ordering_values
Expand Down
53 changes: 37 additions & 16 deletions datafusion/physical-expr/src/aggregate/nth_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ use crate::{
};

use arrow_array::cast::AsArray;
use arrow_array::ArrayRef;
use arrow_array::{new_empty_array, ArrayRef, StructArray};
use arrow_schema::{DataType, Field, Fields};
use datafusion_common::utils::get_row_at_idx;
use datafusion_common::utils::{array_into_list_array, get_row_at_idx};
use datafusion_common::{exec_err, internal_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::Accumulator;

Expand Down Expand Up @@ -270,7 +270,14 @@ impl Accumulator for NthValueAccumulator {
let ordering_values = orderings.into_iter().map(|partition_ordering_rows| {
// Extract value from struct to ordering_rows for each group/partition
partition_ordering_rows.into_iter().map(|ordering_row| {
if let ScalarValue::Struct(Some(ordering_columns_per_row), _) = ordering_row {
if let ScalarValue::Struct(s) = ordering_row {
let mut ordering_columns_per_row = vec![];

for column in s.columns() {
let sv = ScalarValue::try_from_array(column, 0)?;
ordering_columns_per_row.push(sv);
}

Ok(ordering_columns_per_row)
} else {
exec_err!(
Expand Down Expand Up @@ -305,7 +312,7 @@ impl Accumulator for NthValueAccumulator {
fn state(&mut self) -> Result<Vec<ScalarValue>> {
let mut result = vec![self.evaluate_values()];
if !self.ordering_req.is_empty() {
result.push(self.evaluate_orderings());
result.push(self.evaluate_orderings()?);
}
Ok(result)
}
Expand Down Expand Up @@ -354,21 +361,35 @@ impl Accumulator for NthValueAccumulator {
}

impl NthValueAccumulator {
fn evaluate_orderings(&self) -> ScalarValue {
fn evaluate_orderings(&self) -> Result<ScalarValue> {
let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]);
let struct_field = Fields::from(fields);
let struct_field = Fields::from(fields.clone());

let orderings = self
.ordering_values
.iter()
.map(|ordering| {
ScalarValue::Struct(Some(ordering.clone()), struct_field.clone())
})
.collect::<Vec<_>>();
let struct_type = DataType::Struct(struct_field);
let mut column_wise_ordering_values = vec![];
let num_columns = fields.len();
for i in 0..num_columns {
let column_values = self
.ordering_values
.iter()
.map(|x| x[i].clone())
.collect::<Vec<_>>();
let array = if column_values.is_empty() {
new_empty_array(fields[i].data_type())
} else {
ScalarValue::iter_to_array(column_values.into_iter())?
};
column_wise_ordering_values.push(array);
}

let ordering_array = StructArray::try_new(
struct_field.clone(),
column_wise_ordering_values,
None,
)?;

// Wrap in List, so we have the same data structure ListArray(StructArray..) for group by cases
ScalarValue::List(ScalarValue::new_list(&orderings, &struct_type))
Ok(ScalarValue::List(Arc::new(array_into_list_array(
Arc::new(ordering_array),
))))
}

fn evaluate_values(&self) -> ScalarValue {
Expand Down

0 comments on commit 98afb20

Please sign in to comment.