Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change ScalarValue::List to store ArrayRef #7629

Merged
merged 74 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
30c84d3
first draft
jayzhan211 Sep 18, 2023
0d1c79a
revert subquery
jayzhan211 Sep 18, 2023
fd3f050
Decimal128 in list_to_array
jayzhan211 Sep 18, 2023
bcccbed
refactor null in list_to_array
jayzhan211 Sep 18, 2023
c3a06a0
cleanup iter to array v3
jayzhan211 Sep 18, 2023
e81ec13
remove comment
jayzhan211 Sep 19, 2023
3e23a13
cleanup
jayzhan211 Sep 19, 2023
1043107
cleanup struct array
jayzhan211 Sep 19, 2023
2a4a6d2
remove new_list in array
jayzhan211 Sep 19, 2023
27363e6
cleanup
jayzhan211 Sep 19, 2023
5bd23f6
cleanup
jayzhan211 Sep 19, 2023
1d853b3
cleanup list to array
jayzhan211 Sep 19, 2023
2af1aba
cleanup list to array
jayzhan211 Sep 19, 2023
d0186f4
remove try from array v2
jayzhan211 Sep 19, 2023
6f6baf2
remove try from array v4
jayzhan211 Sep 19, 2023
891d425
cleanup expr_simply
jayzhan211 Sep 19, 2023
3441ebc
cleanup iter to array v3
jayzhan211 Sep 20, 2023
f177a64
checkpoint
jayzhan211 Sep 21, 2023
108f179
remove try from array v3
jayzhan211 Sep 21, 2023
ceefaa9
remove iter to array v4
jayzhan211 Sep 21, 2023
4fffb1f
rewrite iter to array list v2
jayzhan211 Sep 22, 2023
138fd9b
cleanup iter to array list v3
jayzhan211 Sep 22, 2023
b05318a
remove test
jayzhan211 Sep 22, 2023
b002f75
refactor wrap_into_list_array
jayzhan211 Sep 22, 2023
165f0fc
cleanup
jayzhan211 Sep 22, 2023
7f7580b
cleanup
jayzhan211 Sep 23, 2023
f756c61
revert math expressions
jayzhan211 Sep 23, 2023
51f94ed
remove iter to array v3
jayzhan211 Sep 23, 2023
07edc88
fmt
jayzhan211 Sep 23, 2023
be6afe7
cleanup
jayzhan211 Sep 23, 2023
8d8fc43
cleanup
jayzhan211 Sep 23, 2023
bc87f8b
cleanup
jayzhan211 Sep 23, 2023
c2ec867
cleanup
jayzhan211 Sep 23, 2023
937792b
fix check
jayzhan211 Sep 23, 2023
2d48fc2
fix build_a_list_array_from_scalars
jayzhan211 Sep 24, 2023
85a0da6
fmt
jayzhan211 Sep 24, 2023
0f0af8a
clippy fix
jayzhan211 Sep 24, 2023
aee9870
fix scalars test
jayzhan211 Sep 24, 2023
4600ed1
fix array agg distinct
jayzhan211 Sep 25, 2023
4b598f6
fix agg test
jayzhan211 Sep 27, 2023
c925179
cleanup
jayzhan211 Sep 28, 2023
4433c6b
add hash for scalar::Listarr
jayzhan211 Sep 30, 2023
544c0fd
fix count distinct
jayzhan211 Sep 30, 2023
6d96046
fmt
jayzhan211 Sep 30, 2023
bda350c
comment out scalarvalue list inproto
jayzhan211 Sep 30, 2023
73efc56
rename
jayzhan211 Oct 2, 2023
c018b7a
rename and add comment
jayzhan211 Oct 2, 2023
96b342a
reduce tryFromArray for List
jayzhan211 Oct 2, 2023
389a3ab
reduce tryFromArray for FixSizeList
jayzhan211 Oct 2, 2023
3768db1
fmt
jayzhan211 Oct 2, 2023
95626fa
proto for list
jayzhan211 Oct 3, 2023
57ea1fe
cleanup
jayzhan211 Oct 3, 2023
2403c9b
replace List with ListArr
jayzhan211 Oct 4, 2023
067049b
remove build a list ... func
jayzhan211 Oct 4, 2023
035c3d0
refactor merge_batch in DistinctBitXorAccumulator
jayzhan211 Oct 4, 2023
119e726
rewrite merge batch in ArrayAggAccumulator
jayzhan211 Oct 4, 2023
6e32c42
clippy
jayzhan211 Oct 4, 2023
c06d96b
cleanup
jayzhan211 Oct 4, 2023
f772d0d
return array directly in array()
jayzhan211 Oct 4, 2023
633afec
add expected message
jayzhan211 Oct 4, 2023
ef92173
fmt
jayzhan211 Oct 4, 2023
9d85585
rename add add doc test for core function
jayzhan211 Oct 4, 2023
2a16175
fix ci
jayzhan211 Oct 4, 2023
ca11115
fix type
jayzhan211 Oct 4, 2023
0e43051
rename and address comment
jayzhan211 Oct 11, 2023
76a0a73
address comment
jayzhan211 Oct 11, 2023
577612b
rewrite roundtrip scalars
jayzhan211 Oct 11, 2023
82c8802
fix flatten
jayzhan211 Oct 12, 2023
b7daf86
cleanup
jayzhan211 Oct 12, 2023
1924682
add partial cmp
jayzhan211 Oct 12, 2023
a11d5fd
fmt
jayzhan211 Oct 12, 2023
226d110
add comment
jayzhan211 Oct 13, 2023
116397a
cleanup
jayzhan211 Oct 13, 2023
c59ec00
Merge remote-tracking branch 'apache/main' into arrayref-for-scalarva…
alamb Oct 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
897 changes: 585 additions & 312 deletions datafusion/common/src/scalar.rs

Large diffs are not rendered by default.

16 changes: 15 additions & 1 deletion datafusion/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

use crate::{DataFusionError, Result, ScalarValue};
use arrow::array::{ArrayRef, PrimitiveArray};
use arrow::buffer::OffsetBuffer;
use arrow::compute;
use arrow::compute::{partition, SortColumn, SortOptions};
use arrow::datatypes::{SchemaRef, UInt32Type};
use arrow::datatypes::{Field, SchemaRef, UInt32Type};
use arrow::record_batch::RecordBatch;
use arrow_array::ListArray;
use sqlparser::ast::Ident;
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;
Expand Down Expand Up @@ -334,6 +336,18 @@ pub fn longest_consecutive_prefix<T: Borrow<usize>>(
count
}

/// Wrap an array into a single element `ListArray`.
/// For example `[1, 2, 3]` would be converted into `[[1, 2, 3]]`
pub fn wrap_into_list_array(arr: ArrayRef) -> ListArray {
let offsets = OffsetBuffer::from_lengths([arr.len()]);
ListArray::new(
Arc::new(Field::new("item", arr.data_type().to_owned(), true)),
offsets,
arr,
None,
)
}

/// An extension trait for smart pointers. Provides an interface to get a
/// raw pointer to the data (with metadata stripped away).
///
Expand Down
36 changes: 17 additions & 19 deletions datafusion/core/tests/sql/aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,23 @@ async fn csv_query_array_agg_distinct() -> Result<()> {
let column = actual[0].column(0);
assert_eq!(column.len(), 1);

if let ScalarValue::List(Some(mut v), _) = ScalarValue::try_from_array(column, 0)? {
// workaround lack of Ord of ScalarValue
let cmp = |a: &ScalarValue, b: &ScalarValue| {
a.partial_cmp(b).expect("Can compare ScalarValues")
};
v.sort_by(cmp);
assert_eq!(
*v,
vec![
ScalarValue::UInt32(Some(1)),
ScalarValue::UInt32(Some(2)),
ScalarValue::UInt32(Some(3)),
ScalarValue::UInt32(Some(4)),
ScalarValue::UInt32(Some(5))
]
);
} else {
unreachable!();
}
let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&column)?;
let mut scalars = scalar_vec[0].clone();
// workaround lack of Ord of ScalarValue
let cmp = |a: &ScalarValue, b: &ScalarValue| {
a.partial_cmp(b).expect("Can compare ScalarValues")
};
scalars.sort_by(cmp);
assert_eq!(
scalars,
vec![
ScalarValue::UInt32(Some(1)),
ScalarValue::UInt32(Some(2)),
ScalarValue::UInt32(Some(3)),
ScalarValue::UInt32(Some(4)),
ScalarValue::UInt32(Some(5))
]
);

Ok(())
}
Expand Down
10 changes: 8 additions & 2 deletions datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ use arrow::{
error::ArrowError,
record_batch::RecordBatch,
};
use datafusion_common::tree_node::{RewriteRecursion, TreeNode, TreeNodeRewriter};
use datafusion_common::{
cast::{as_large_list_array, as_list_array},
tree_node::{RewriteRecursion, TreeNode, TreeNodeRewriter},
};
use datafusion_common::{
exec_err, internal_err, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue,
};
Expand Down Expand Up @@ -392,8 +395,11 @@ impl<'a> ConstEvaluator<'a> {
"Could not evaluate the expression, found a result of length {}",
a.len()
)
} else if as_list_array(&a).is_ok() || as_large_list_array(&a).is_ok() {
Ok(ScalarValue::List(a))
jayzhan211 marked this conversation as resolved.
Show resolved Hide resolved
} else {
Ok(ScalarValue::try_from_array(&a, 0)?)
// Non-ListArray
ScalarValue::try_from_array(&a, 0)
}
}
ColumnarValue::Scalar(s) => Ok(s),
Expand Down
161 changes: 90 additions & 71 deletions datafusion/physical-expr/src/aggregate/array_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ use crate::expressions::format_state_name;
use crate::{AggregateExpr, PhysicalExpr};
use arrow::array::ArrayRef;
use arrow::datatypes::{DataType, Field};
use arrow_array::Array;
use datafusion_common::cast::as_list_array;
use datafusion_common::utils::wrap_into_list_array;
use datafusion_common::Result;
use datafusion_common::ScalarValue;
use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_expr::Accumulator;
use std::any::Any;
use std::sync::Arc;
Expand Down Expand Up @@ -102,7 +105,7 @@ impl PartialEq<dyn Any> for ArrayAgg {

#[derive(Debug)]
pub(crate) struct ArrayAggAccumulator {
values: Vec<ScalarValue>,
values: Vec<ArrayRef>,
datatype: DataType,
}

Expand All @@ -117,50 +120,60 @@ impl ArrayAggAccumulator {
}

impl Accumulator for ArrayAggAccumulator {
// Append value like Int64Array(1,2,3)
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
if values.is_empty() {
return Ok(());
}
assert!(values.len() == 1, "array_agg can only take 1 param!");
let arr = &values[0];
(0..arr.len()).try_for_each(|index| {
let scalar = ScalarValue::try_from_array(arr, index)?;
self.values.push(scalar);
Ok(())
})
let val = values[0].clone();
self.values.push(val);
Ok(())
}

// Append value like ListArray(Int64Array(1,2,3), Int64Array(4,5,6))
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
if states.is_empty() {
return Ok(());
}
assert!(states.len() == 1, "array_agg states must be singleton!");
let arr = &states[0];
(0..arr.len()).try_for_each(|index| {
let scalar = ScalarValue::try_from_array(arr, index)?;
if let ScalarValue::List(Some(values), _) = scalar {
self.values.extend(values);
Ok(())
} else {
internal_err!("array_agg state must be list!")
}
})

let list_arr = as_list_array(&states[0])?;
for arr in list_arr.iter().flatten() {
self.values.push(arr);
}
Ok(())
}

fn state(&self) -> Result<Vec<ScalarValue>> {
Ok(vec![self.evaluate()?])
}

fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::new_list(
Some(self.values.clone()),
self.datatype.clone(),
))
// Transform Vec<ListArr> to ListArr

let element_arrays: Vec<&dyn Array> =
self.values.iter().map(|a| a.as_ref()).collect();

if element_arrays.is_empty() {
let arr = ScalarValue::new_list(&[], &self.datatype);
return Ok(ScalarValue::List(arr));
}

let concated_array = arrow::compute::concat(&element_arrays)?;
let list_array = wrap_into_list_array(concated_array);

Ok(ScalarValue::List(Arc::new(list_array)))
}

fn size(&self) -> usize {
std::mem::size_of_val(self) + ScalarValue::size_of_vec(&self.values)
- std::mem::size_of_val(&self.values)
std::mem::size_of_val(self)
+ (std::mem::size_of::<ArrayRef>() * self.values.capacity())
+ self
.values
.iter()
.map(|arr| arr.get_array_memory_size())
.sum::<usize>()
+ self.datatype.size()
- std::mem::size_of_val(&self.datatype)
}
Expand All @@ -176,72 +189,78 @@ mod tests {
use arrow::array::Int32Array;
use arrow::datatypes::*;
use arrow::record_batch::RecordBatch;
use arrow_array::Array;
use arrow_array::ListArray;
use arrow_buffer::OffsetBuffer;
use datafusion_common::DataFusionError;
use datafusion_common::Result;

#[test]
fn array_agg_i32() -> Result<()> {
let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));

let list = ScalarValue::new_list(
Some(vec![
ScalarValue::Int32(Some(1)),
ScalarValue::Int32(Some(2)),
ScalarValue::Int32(Some(3)),
ScalarValue::Int32(Some(4)),
ScalarValue::Int32(Some(5)),
]),
DataType::Int32,
);
let list = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![
Some(1),
Some(2),
Some(3),
Some(4),
Some(5),
])]);
let list = ScalarValue::List(Arc::new(list));

generic_test_op!(a, DataType::Int32, ArrayAgg, list, DataType::Int32)
}

#[test]
fn array_agg_nested() -> Result<()> {
let l1 = ScalarValue::new_list(
Some(vec![
ScalarValue::new_list(
Some(vec![
ScalarValue::from(1i32),
ScalarValue::from(2i32),
ScalarValue::from(3i32),
]),
DataType::Int32,
),
ScalarValue::new_list(
Some(vec![ScalarValue::from(4i32), ScalarValue::from(5i32)]),
DataType::Int32,
),
]),
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
let a1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![
Some(1),
Some(2),
Some(3),
])]);
let a2 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![
Some(4),
Some(5),
])]);
let l1 = ListArray::new(
Arc::new(Field::new("item", a1.data_type().to_owned(), true)),
OffsetBuffer::from_lengths([a1.len() + a2.len()]),
arrow::compute::concat(&[&a1, &a2])?,
None,
);

let l2 = ScalarValue::new_list(
Some(vec![
ScalarValue::new_list(
Some(vec![ScalarValue::from(6i32)]),
DataType::Int32,
),
ScalarValue::new_list(
Some(vec![ScalarValue::from(7i32), ScalarValue::from(8i32)]),
DataType::Int32,
),
]),
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
let a1 =
ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![Some(6)])]);
let a2 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![
Some(7),
Some(8),
])]);
let l2 = ListArray::new(
Arc::new(Field::new("item", a1.data_type().to_owned(), true)),
OffsetBuffer::from_lengths([a1.len() + a2.len()]),
arrow::compute::concat(&[&a1, &a2])?,
None,
);

let l3 = ScalarValue::new_list(
Some(vec![ScalarValue::new_list(
Some(vec![ScalarValue::from(9i32)]),
DataType::Int32,
)]),
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
let a1 =
ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![Some(9)])]);
let l3 = ListArray::new(
Arc::new(Field::new("item", a1.data_type().to_owned(), true)),
OffsetBuffer::from_lengths([a1.len()]),
arrow::compute::concat(&[&a1])?,
None,
);

let list = ScalarValue::new_list(
Some(vec![l1.clone(), l2.clone(), l3.clone()]),
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
let list = ListArray::new(
Arc::new(Field::new("item", l1.data_type().to_owned(), true)),
OffsetBuffer::from_lengths([l1.len() + l2.len() + l3.len()]),
arrow::compute::concat(&[&l1, &l2, &l3])?,
None,
);
let list = ScalarValue::List(Arc::new(list));
let l1 = ScalarValue::List(Arc::new(l1));
let l2 = ScalarValue::List(Arc::new(l2));
let l3 = ScalarValue::List(Arc::new(l3));

let array = ScalarValue::iter_to_array(vec![l1, l2, l3]).unwrap();

Expand Down
Loading