Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change ScalarValue::List to store ArrayRef #7629

Merged
merged 74 commits into from
Oct 16, 2023
Merged
Changes from 1 commit
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
30c84d3
first draft
jayzhan211 Sep 18, 2023
0d1c79a
revert subquery
jayzhan211 Sep 18, 2023
fd3f050
Decimal128 in list_to_array
jayzhan211 Sep 18, 2023
bcccbed
refactor null in list_to_array
jayzhan211 Sep 18, 2023
c3a06a0
cleanup iter to array v3
jayzhan211 Sep 18, 2023
e81ec13
remove comment
jayzhan211 Sep 19, 2023
3e23a13
cleanup
jayzhan211 Sep 19, 2023
1043107
cleanup struct array
jayzhan211 Sep 19, 2023
2a4a6d2
remove new_list in array
jayzhan211 Sep 19, 2023
27363e6
cleanup
jayzhan211 Sep 19, 2023
5bd23f6
cleanup
jayzhan211 Sep 19, 2023
1d853b3
cleanup list to array
jayzhan211 Sep 19, 2023
2af1aba
cleanup list to array
jayzhan211 Sep 19, 2023
d0186f4
remove try from array v2
jayzhan211 Sep 19, 2023
6f6baf2
remove try from array v4
jayzhan211 Sep 19, 2023
891d425
cleanup expr_simply
jayzhan211 Sep 19, 2023
3441ebc
cleanup iter to array v3
jayzhan211 Sep 20, 2023
f177a64
checkpoint
jayzhan211 Sep 21, 2023
108f179
remove try from array v3
jayzhan211 Sep 21, 2023
ceefaa9
remove iter to array v4
jayzhan211 Sep 21, 2023
4fffb1f
rewrite iter to array list v2
jayzhan211 Sep 22, 2023
138fd9b
cleanup iter to array list v3
jayzhan211 Sep 22, 2023
b05318a
remove test
jayzhan211 Sep 22, 2023
b002f75
refactor wrap_into_list_array
jayzhan211 Sep 22, 2023
165f0fc
cleanup
jayzhan211 Sep 22, 2023
7f7580b
cleanup
jayzhan211 Sep 23, 2023
f756c61
revert math expressions
jayzhan211 Sep 23, 2023
51f94ed
remove iter to array v3
jayzhan211 Sep 23, 2023
07edc88
fmt
jayzhan211 Sep 23, 2023
be6afe7
cleanup
jayzhan211 Sep 23, 2023
8d8fc43
cleanup
jayzhan211 Sep 23, 2023
bc87f8b
cleanup
jayzhan211 Sep 23, 2023
c2ec867
cleanup
jayzhan211 Sep 23, 2023
937792b
fix check
jayzhan211 Sep 23, 2023
2d48fc2
fix build_a_list_array_from_scalars
jayzhan211 Sep 24, 2023
85a0da6
fmt
jayzhan211 Sep 24, 2023
0f0af8a
clippy fix
jayzhan211 Sep 24, 2023
aee9870
fix scalars test
jayzhan211 Sep 24, 2023
4600ed1
fix array agg distinct
jayzhan211 Sep 25, 2023
4b598f6
fix agg test
jayzhan211 Sep 27, 2023
c925179
cleanup
jayzhan211 Sep 28, 2023
4433c6b
add hash for scalar::Listarr
jayzhan211 Sep 30, 2023
544c0fd
fix count distinct
jayzhan211 Sep 30, 2023
6d96046
fmt
jayzhan211 Sep 30, 2023
bda350c
comment out scalarvalue list inproto
jayzhan211 Sep 30, 2023
73efc56
rename
jayzhan211 Oct 2, 2023
c018b7a
rename and add comment
jayzhan211 Oct 2, 2023
96b342a
reduce tryFromArray for List
jayzhan211 Oct 2, 2023
389a3ab
reduce tryFromArray for FixSizeList
jayzhan211 Oct 2, 2023
3768db1
fmt
jayzhan211 Oct 2, 2023
95626fa
proto for list
jayzhan211 Oct 3, 2023
57ea1fe
cleanup
jayzhan211 Oct 3, 2023
2403c9b
replace List with ListArr
jayzhan211 Oct 4, 2023
067049b
remove build a list ... func
jayzhan211 Oct 4, 2023
035c3d0
refactor merge_batch in DistinctBitXorAccumulator
jayzhan211 Oct 4, 2023
119e726
rewrite merge batch in ArrayAggAccumulator
jayzhan211 Oct 4, 2023
6e32c42
clippy
jayzhan211 Oct 4, 2023
c06d96b
cleanup
jayzhan211 Oct 4, 2023
f772d0d
return array directly in array()
jayzhan211 Oct 4, 2023
633afec
add expected message
jayzhan211 Oct 4, 2023
ef92173
fmt
jayzhan211 Oct 4, 2023
9d85585
rename add add doc test for core function
jayzhan211 Oct 4, 2023
2a16175
fix ci
jayzhan211 Oct 4, 2023
ca11115
fix type
jayzhan211 Oct 4, 2023
0e43051
rename and address comment
jayzhan211 Oct 11, 2023
76a0a73
address comment
jayzhan211 Oct 11, 2023
577612b
rewrite roundtrip scalars
jayzhan211 Oct 11, 2023
82c8802
fix flatten
jayzhan211 Oct 12, 2023
b7daf86
cleanup
jayzhan211 Oct 12, 2023
1924682
add partial cmp
jayzhan211 Oct 12, 2023
a11d5fd
fmt
jayzhan211 Oct 12, 2023
226d110
add comment
jayzhan211 Oct 13, 2023
116397a
cleanup
jayzhan211 Oct 13, 2023
c59ec00
Merge remote-tracking branch 'apache/main' into arrayref-for-scalarva…
alamb Oct 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
replace List with ListArr
Signed-off-by: jayzhan211 <[email protected]>
jayzhan211 committed Oct 11, 2023
commit 2403c9b7b5c2869e420a0db3997bfb52d072b1ee
201 changes: 60 additions & 141 deletions datafusion/common/src/scalar.rs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -396,7 +396,7 @@ impl<'a> ConstEvaluator<'a> {
a.len()
)
} else if as_list_array(&a).is_ok() || as_large_list_array(&a).is_ok() {
Ok(ScalarValue::ListArr(a))
Ok(ScalarValue::List(a))
jayzhan211 marked this conversation as resolved.
Show resolved Hide resolved
} else {
// Non-ListArray
ScalarValue::try_from_array(&a, 0)
14 changes: 7 additions & 7 deletions datafusion/physical-expr/src/aggregate/array_agg.rs
Original file line number Diff line number Diff line change
@@ -158,13 +158,13 @@ impl Accumulator for ArrayAggAccumulator {

if element_arrays.is_empty() {
let arr = ScalarValue::list_to_array(&[], &self.datatype);
return Ok(ScalarValue::ListArr(arr));
return Ok(ScalarValue::List(arr));
}

let concated_array = arrow::compute::concat(&element_arrays)?;
let list_array = wrap_into_list_array(concated_array);

Ok(ScalarValue::ListArr(Arc::new(list_array)))
Ok(ScalarValue::List(Arc::new(list_array)))
}

fn size(&self) -> usize {
@@ -207,7 +207,7 @@ mod tests {
Some(4),
Some(5),
])]);
let list = ScalarValue::ListArr(Arc::new(list));
let list = ScalarValue::List(Arc::new(list));

generic_test_op!(a, DataType::Int32, ArrayAgg, list, DataType::Int32)
}
@@ -258,10 +258,10 @@ mod tests {
arrow::compute::concat(&[&l1, &l2, &l3])?,
None,
);
let list = ScalarValue::ListArr(Arc::new(list));
let l1 = ScalarValue::ListArr(Arc::new(l1));
let l2 = ScalarValue::ListArr(Arc::new(l2));
let l3 = ScalarValue::ListArr(Arc::new(l3));
let list = ScalarValue::List(Arc::new(list));
let l1 = ScalarValue::List(Arc::new(l1));
let l2 = ScalarValue::List(Arc::new(l2));
let l3 = ScalarValue::List(Arc::new(l3));

let array = ScalarValue::iter_to_array(vec![l1, l2, l3]).unwrap();

18 changes: 9 additions & 9 deletions datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
Original file line number Diff line number Diff line change
@@ -162,7 +162,7 @@ impl Accumulator for DistinctArrayAggAccumulator {
fn evaluate(&self) -> Result<ScalarValue> {
let values: Vec<ScalarValue> = self.values.iter().cloned().collect();
let arr = ScalarValue::list_to_array(&values, &self.datatype);
Ok(ScalarValue::ListArr(arr))
Ok(ScalarValue::List(arr))
}

fn size(&self) -> usize {
@@ -194,7 +194,7 @@ mod tests {
) -> Result<()> {
for expected in expected_values {
match (expected, &actual) {
(ScalarValue::ListArr(arr1), ScalarValue::ListArr(arr2)) => {
(ScalarValue::List(arr1), ScalarValue::List(arr2)) => {
if arr1.eq(arr2) {
return Ok(());
}
@@ -265,7 +265,7 @@ mod tests {
.into_iter()
.map(|x| Some(*x))
.collect::<Vec<_>>();
expected_values.push(ScalarValue::ListArr(Arc::new(
expected_values.push(ScalarValue::List(Arc::new(
ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(value)]),
)));
}
@@ -337,9 +337,9 @@ mod tests {
None,
);

let l1 = ScalarValue::ListArr(Arc::new(l1));
let l2 = ScalarValue::ListArr(Arc::new(l2));
let l3 = ScalarValue::ListArr(Arc::new(l3));
let l1 = ScalarValue::List(Arc::new(l1));
let l2 = ScalarValue::List(Arc::new(l2));
let l3 = ScalarValue::List(Arc::new(l3));

// Duplicate l1 in the input array and check that it is deduped in the output.
let array = ScalarValue::iter_to_array(vec![l1.clone(), l2, l3, l1]).unwrap();
@@ -399,9 +399,9 @@ mod tests {
None,
);

let l1 = ScalarValue::ListArr(Arc::new(l1));
let l2 = ScalarValue::ListArr(Arc::new(l2));
let l3 = ScalarValue::ListArr(Arc::new(l3));
let l1 = ScalarValue::List(Arc::new(l1));
let l2 = ScalarValue::List(Arc::new(l2));
let l3 = ScalarValue::List(Arc::new(l3));

// Duplicate l1 in the input array and check that it is deduped in the output.
let input1 = ScalarValue::iter_to_array(vec![l1.clone(), l2]).unwrap();
4 changes: 2 additions & 2 deletions datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
Original file line number Diff line number Diff line change
@@ -252,7 +252,7 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {

fn evaluate(&self) -> Result<ScalarValue> {
let arr = ScalarValue::list_to_array(&self.values, &self.datatypes[0]);
Ok(ScalarValue::ListArr(arr))
Ok(ScalarValue::List(arr))
}

fn size(&self) -> usize {
@@ -316,7 +316,7 @@ impl OrderSensitiveArrayAggAccumulator {
let struct_type = DataType::Struct(Fields::from(fields));

let arr = ScalarValue::list_to_array(&orderings, &struct_type);
Ok(ScalarValue::ListArr(arr))
Ok(ScalarValue::List(arr))
}
}

2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
Original file line number Diff line number Diff line change
@@ -644,7 +644,7 @@ where
.collect();

let arr = ScalarValue::list_to_array(&values, &T::DATA_TYPE);
vec![ScalarValue::ListArr(arr)]
vec![ScalarValue::List(arr)]
};
Ok(state_out)
}
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/aggregate/count_distinct.rs
Original file line number Diff line number Diff line change
@@ -146,7 +146,7 @@ impl Accumulator for DistinctCountAccumulator {
let scalars = self.values.iter().cloned().collect::<Vec<_>>();
let arr =
ScalarValue::build_a_list_array_from_scalars(scalars, &self.state_data_type)?;
Ok(vec![ScalarValue::ListArr(arr)])
Ok(vec![ScalarValue::List(arr)])
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/aggregate/median.rs
Original file line number Diff line number Diff line change
@@ -153,7 +153,7 @@ impl<T: ArrowNumericType> Accumulator for MedianAccumulator<T> {
.collect();

let arr = ScalarValue::list_to_array(&all_values, &self.data_type);
Ok(vec![ScalarValue::ListArr(arr)])
Ok(vec![ScalarValue::List(arr)])
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/aggregate/sum_distinct.rs
Original file line number Diff line number Diff line change
@@ -166,7 +166,7 @@ impl<T: ArrowPrimitiveType> Accumulator for DistinctSumAccumulator<T> {
&self.data_type,
))
});
vec![ScalarValue::ListArr(ScalarValue::list_to_array(
vec![ScalarValue::List(ScalarValue::list_to_array(
&distinct_values,
&self.data_type,
))]
4 changes: 2 additions & 2 deletions datafusion/physical-expr/src/aggregate/tdigest.rs
Original file line number Diff line number Diff line change
@@ -584,7 +584,7 @@ impl TDigest {
ScalarValue::Float64(Some(self.count)),
ScalarValue::Float64(Some(self.max)),
ScalarValue::Float64(Some(self.min)),
ScalarValue::ListArr(arr),
ScalarValue::List(arr),
]
}

@@ -605,7 +605,7 @@ impl TDigest {
};

let centroids: Vec<_> = match &state[5] {
ScalarValue::ListArr(arr) => {
ScalarValue::List(arr) => {
let list_array = as_list_array(arr);
let arr = list_array.values();
let f64arr = as_primitive_array::<Float64Type>(arr).unwrap();
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
@@ -421,7 +421,7 @@ fn array(values: &[ColumnarValue]) -> Result<ColumnarValue> {
None => {
let null_arr = new_null_array(&DataType::Null, 0);
let list_arr = Arc::new(wrap_into_list_array(null_arr));
Ok(ColumnarValue::Scalar(ScalarValue::ListArr(list_arr)))
Ok(ColumnarValue::Scalar(ScalarValue::List(list_arr)))
}
// all nulls, set default data type as int32
Some(DataType::Null) => {
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/values.rs
Original file line number Diff line number Diff line change
@@ -70,7 +70,7 @@ impl ValuesExec {
match r {
Ok(ColumnarValue::Scalar(scalar)) => Ok(scalar),
Ok(ColumnarValue::Array(a)) if a.len() == 1 => {
Ok(ScalarValue::ListArr(a))
Ok(ScalarValue::List(a))
}
Ok(ColumnarValue::Array(a)) => {
plan_err!(
2 changes: 1 addition & 1 deletion datafusion/proto/src/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
@@ -679,7 +679,7 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
)
.unwrap();
let arr = record_batch.column(0);
Self::ListArr(arr.to_owned())
Self::List(arr.to_owned())
}
Value::NullValue(v) => {
let null_type: DataType = v.try_into()?;
3 changes: 1 addition & 2 deletions datafusion/proto/src/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
@@ -1145,8 +1145,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
"Proto serialization error: ScalarValue::Fixedsizelist not supported"
.to_string(),
)),
ScalarValue::List(..) => unimplemented!("Deprecated"),
ScalarValue::ListArr(arr) => {
ScalarValue::List(arr) => {
let batch =
RecordBatch::try_from_iter(vec![("field_name", arr.to_owned())])
.unwrap();
9 changes: 4 additions & 5 deletions datafusion/proto/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
@@ -708,10 +708,10 @@ fn round_trip_scalar_list() {
]));

let should_pass: Vec<ScalarValue> = vec![
ScalarValue::ListArr(i32arr),
ScalarValue::ListArr(f32arr),
ScalarValue::ListArr(null_arr),
ScalarValue::ListArr(f32arr_with_null),
ScalarValue::List(i32arr),
ScalarValue::List(f32arr),
ScalarValue::List(null_arr),
ScalarValue::List(f32arr_with_null),
];

for test_case in should_pass.into_iter() {
@@ -942,7 +942,6 @@ fn roundtrip_null_scalar_values() {
ScalarValue::Date32(None),
ScalarValue::TimestampMicrosecond(None, None),
ScalarValue::TimestampNanosecond(None, None),
// ScalarValue::List(None, Arc::new(Field::new("item", DataType::Boolean, false))),
];

for test_case in test_types.into_iter() {
7 changes: 2 additions & 5 deletions datafusion/sql/src/expr/value.rs
Original file line number Diff line number Diff line change
@@ -154,16 +154,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
values.iter().map(|e| e.data_type()).collect();

if data_types.is_empty() {
Ok(lit(ScalarValue::ListArr(new_null_array(
&DataType::Null,
0,
))))
Ok(lit(ScalarValue::List(new_null_array(&DataType::Null, 0))))
} else if data_types.len() > 1 {
not_impl_err!("Arrays with different types are not supported: {data_types:?}")
} else {
let data_type = values[0].data_type();
let arr = ScalarValue::list_to_array(&values, &data_type);
Ok(lit(ScalarValue::ListArr(arr)))
Ok(lit(ScalarValue::List(arr)))
}
}