From a006450baa970d2d04e73d58b4f88bbaeb1aef26 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Thu, 14 Dec 2023 20:31:59 +0800 Subject: [PATCH] cleanup Signed-off-by: jayzhan211 --- datafusion/common/src/hash_utils.rs | 22 +++++ .../src/aggregate/array_agg_ordered.rs | 99 +++++++++++++------ 2 files changed, 92 insertions(+), 29 deletions(-) diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index 9198461e00bf9..bc0bb529fb2b9 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -240,6 +240,24 @@ where Ok(()) } +fn hash_struct_array( + array: &StructArray, + random_state: &RandomState, + hashes_buffer: &mut [u64], +) -> Result<()> { + // let fields = array.fields(); + // let columns = fields + // .iter() + // .map(|field| array.column_by_name(field.name()).unwrap()) + // .collect::>(); + // let mut columns_hashes = vec![0u64; array.len()]; + // create_hashes(&columns, random_state, &mut columns_hashes)?; + // for (i, hash) in hashes_buffer.iter_mut().enumerate() { + // *hash = columns_hashes[i]; + // } + Ok(()) +} + /// Test version of `create_hashes` that produces the same value for /// all hashes (to test collisions) /// @@ -335,6 +353,10 @@ pub fn create_hashes<'a>( let array = as_large_list_array(array); hash_list_array(array, random_state, hashes_buffer)?; } + DataType::Struct(_) => { + let array = array.as_struct(); + hash_struct_array(array, random_state, hashes_buffer)?; + } _ => { // This is internal because we should have caught this before. return _internal_err!( diff --git a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs index f88ec36894ccd..da8b8bdb1e387 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs @@ -219,10 +219,10 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { // values received from its ordering requirement expression. (This information is necessary for during merging). let agg_orderings = &states[1]; - println!( - "array_agg_values: {:?}, agg_orderings: {:?}", - array_agg_values, agg_orderings - ); + // println!( + // "array_agg_values: {:?}, agg_orderings: {:?}", + // array_agg_values, agg_orderings + // ); if let Some(agg_orderings) = agg_orderings.as_list_opt::() { // Stores ARRAY_AGG results coming from each partition @@ -233,7 +233,7 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { // Existing values should be merged also. partition_values.push(self.values.clone()); partition_ordering_values.push(self.ordering_values.clone()); - println!("(1) partition_ordering_values: {:?}", partition_ordering_values); + // println!("(1) partition_ordering_values: {:?}", partition_ordering_values); assert!(as_list_array(array_agg_values).is_ok()); // Convert array to Scalars to sort them easily. Convert back to array at evaluation. @@ -241,23 +241,64 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { ScalarValue::convert_list_array_to_scalar_vec(array_agg_values)?; partition_values.extend(array_agg_res); - println!("agg_orderings: {:?}", agg_orderings); - let mut field_values: Vec> = Vec::new(); - assert_eq!(agg_orderings.len(), 1); - let agg_ordering_columns = agg_orderings.values(); - let agg_ordering_columns = as_struct_array(agg_ordering_columns)?; - let num_columns = agg_ordering_columns.num_columns(); - for col_index in 0..num_columns { - let col_array = agg_ordering_columns.column(col_index); - println!("col_array: {:?}", col_array); - let col_scalar = - ScalarValue::convert_non_list_array_to_scalars(&col_array)?; - println!("col_scalar: {:?}", col_scalar); - field_values.push(col_scalar); + // println!("agg_orderings: {:?}", agg_orderings); + + let orderings = ScalarValue::convert_list_array_to_scalar_vec(agg_orderings)?; + // println!("orderings: {:?}", orderings); + for partition_ordering_rows in orderings.into_iter() { + // Extract value from struct to ordering_rows for each group/partition + let ordering_value = partition_ordering_rows.into_iter().map(|ordering_row| { + if let ScalarValue::Struct(ordering_columns_per_row) = ordering_row { + // Struct([StructArray + // [ + // -- child 0: "c2@1 DESC" (Int32) + // PrimitiveArray + // [ + // 30, + // ] + // -- child 1: "c3@2 ASC NULLS LAST" (Int32) + // PrimitiveArray + // [ + // 4, + // ] + // ]] + let s = ordering_columns_per_row.as_struct(); + let num_columns = s.num_columns(); + let mut ordering_columns_per_row = vec![]; + for col_index in 0..num_columns { + let col_array = s.column(col_index); + let sv = ScalarValue::try_from_array(col_array, 0)?; + ordering_columns_per_row.push(sv); + } + + Ok(ordering_columns_per_row) + } else { + exec_err!( + "Expects to receive ScalarValue::Struct(Arc) but got:{:?}", + ordering_row.data_type() + ) + } + }).collect::>>()?; + partition_ordering_values.push(ordering_value); } - partition_ordering_values.push(field_values); - println!("partition_ordering_values: {:?}", partition_ordering_values); + + // let mut field_values: Vec> = Vec::new(); + // assert_eq!(agg_orderings.len(), 1); + // let agg_ordering_columns = agg_orderings.values(); + // let agg_ordering_columns = as_struct_array(agg_ordering_columns)?; + // let num_columns = agg_ordering_columns.num_columns(); + // for col_index in 0..num_columns { + // let col_array = agg_ordering_columns.column(col_index); + // println!("col_array: {:?}", col_array); + // let col_scalar = + // ScalarValue::convert_non_list_array_to_scalars(&col_array)?; + // println!("col_scalar: {:?}", col_scalar); + // field_values.push(col_scalar); + // } + + // partition_ordering_values.push(field_values); + // println!("partition_ordering_values: {:?}", partition_ordering_values); let sort_options = self .ordering_req @@ -280,15 +321,15 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { fn state(&self) -> Result> { let mut result = vec![self.evaluate()?]; result.push(self.evaluate_orderings()?); - println!("result: {:?}", result); + // println!("result: {:?}", result); Ok(result) } fn evaluate(&self) -> Result { let arr = ScalarValue::new_list(&self.values, &self.datatypes[0]); - println!("self.vales: {:?}", self.values); - println!("arr: {:?}", arr); - println!("self.datatypes: {:?}", self.datatypes); + // println!("self.vales: {:?}", self.values); + // println!("arr: {:?}", arr); + // println!("self.datatypes: {:?}", self.datatypes); Ok(ScalarValue::List(arr)) } @@ -321,10 +362,10 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { impl OrderSensitiveArrayAggAccumulator { fn evaluate_orderings(&self) -> Result { let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]); - println!("fields: {:?}", fields); + // println!("fields: {:?}", fields); let struct_field = Fields::from(fields.clone()); - println!("struct_field: {:?}", struct_field); - println!("self.ordering_values: {:?}", self.ordering_values); + // println!("struct_field: {:?}", struct_field); + // println!("self.ordering_values: {:?}", self.ordering_values); // let arr_vec = self // .ordering_values // .iter() @@ -340,7 +381,7 @@ impl OrderSensitiveArrayAggAccumulator { .iter() .map(|x| x[i].clone()) .collect::>(); - println!("column_values: {:?}", column_values); + // println!("column_values: {:?}", column_values); let array = if column_values.is_empty() { new_empty_array(fields[i].data_type()) } else { @@ -350,7 +391,7 @@ impl OrderSensitiveArrayAggAccumulator { } let ordering_array = StructArray::try_new(struct_field.clone(), column_wise_ordering_values, None)?; - println!("ordering_array: {:?}", ordering_array); + // println!("ordering_array: {:?}", ordering_array); let a = array_into_list_array(Arc::new(ordering_array)); let a = Arc::new(a); Ok(ScalarValue::List(a))