-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
avoid copying listarray in unset exec #7002
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,8 +18,8 @@ | |
//! Defines the unnest column plan for unnesting values in a column that contains a list | ||
//! type, conceptually is like joining each row with all the values in the list column. | ||
use arrow::array::{ | ||
new_null_array, Array, ArrayAccessor, ArrayRef, ArrowPrimitiveType, | ||
FixedSizeListArray, Int32Array, LargeListArray, ListArray, PrimitiveArray, | ||
Array, ArrayAccessor, ArrayRef, ArrowPrimitiveType, FixedSizeListArray, Int32Array, | ||
LargeListArray, ListArray, PrimitiveArray, | ||
}; | ||
use arrow::compute::kernels; | ||
use arrow::datatypes::{ | ||
|
@@ -236,21 +236,21 @@ fn build_batch( | |
match list_array.data_type() { | ||
DataType::List(_) => { | ||
let list_array = list_array.as_any().downcast_ref::<ListArray>().unwrap(); | ||
unnest_batch(batch, schema, column, &list_array) | ||
unnest_batch(batch, schema, column, &list_array, list_array.values()) | ||
} | ||
DataType::LargeList(_) => { | ||
let list_array = list_array | ||
.as_any() | ||
.downcast_ref::<LargeListArray>() | ||
.unwrap(); | ||
unnest_batch(batch, schema, column, &list_array) | ||
unnest_batch(batch, schema, column, &list_array, list_array.values()) | ||
} | ||
DataType::FixedSizeList(_, _) => { | ||
let list_array = list_array | ||
.as_any() | ||
.downcast_ref::<FixedSizeListArray>() | ||
.unwrap(); | ||
unnest_batch(batch, schema, column, list_array) | ||
unnest_batch(batch, schema, column, list_array, list_array.values()) | ||
} | ||
_ => Err(DataFusionError::Execution(format!( | ||
"Invalid unnest column {column}" | ||
|
@@ -263,43 +263,27 @@ fn unnest_batch<T>( | |
schema: &SchemaRef, | ||
column: &Column, | ||
list_array: &T, | ||
list_array_values: &Arc<dyn Array>, | ||
) -> Result<RecordBatch> | ||
where | ||
T: ArrayAccessor<Item = ArrayRef>, | ||
{ | ||
// Create an array with the unnested values of the list array, given the list | ||
// array: | ||
// | ||
// [1], null, [2, 3, 4], null, [5, 6] | ||
// | ||
// the result array is: | ||
// | ||
// 1, null, 2, 3, 4, null, 5, 6 | ||
// | ||
let unnested_array = unnest_array(list_array)?; | ||
|
||
// Create an array with the lengths of each list value in the nested array. | ||
// Given the nested array: | ||
// | ||
// [1], null, [2, 3, 4], null, [5, 6] | ||
// | ||
// the result array is: | ||
// | ||
// 1, null, 3, null, 2 | ||
// | ||
// Depending on the list type the result may be Int32Array or Int64Array. | ||
let list_lengths = list_lengths(list_array)?; | ||
|
||
// Create the indices for the take kernel and then use those indices to create | ||
// the unnested record batch. | ||
match list_lengths.data_type() { | ||
DataType::Int32 => { | ||
let list_lengths = as_primitive_array::<Int32Type>(&list_lengths)?; | ||
let unnested_array = | ||
unnest_array(list_array, list_array_values, list_lengths)?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am still struggling to understand the need to call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you clarify which take indices you mean? So unnest_array is simply needed as we need to change that column (physically unnest it). |
||
let indices = create_take_indices(list_lengths, unnested_array.len()); | ||
batch_from_indices(batch, schema, column.index(), &unnested_array, &indices) | ||
} | ||
DataType::Int64 => { | ||
let list_lengths = as_primitive_array::<Int64Type>(&list_lengths)?; | ||
let unnested_array = | ||
unnest_array(list_array, list_array_values, list_lengths)?; | ||
let indices = create_take_indices(list_lengths, unnested_array.len()); | ||
batch_from_indices(batch, schema, column.index(), &unnested_array, &indices) | ||
} | ||
|
@@ -357,14 +341,67 @@ where | |
builder.finish() | ||
} | ||
|
||
fn create_fixed_size_unnest_take_indices<T>( | ||
list_lengths: &PrimitiveArray<T>, | ||
value_length: usize, | ||
) -> PrimitiveArray<T> | ||
where | ||
T: ArrowPrimitiveType, | ||
{ | ||
let capacity = | ||
value_length * list_lengths.len() - list_lengths.null_count() * value_length; | ||
let mut builder = PrimitiveArray::<T>::builder(capacity); | ||
|
||
for row in 0..list_lengths.len() { | ||
if list_lengths.is_null(row) { | ||
builder.append_null(); | ||
} else { | ||
let fixed_length_offset = row * value_length; | ||
// Both `repeat` and `index` are positive intergers. | ||
(fixed_length_offset..fixed_length_offset + value_length).for_each(|r| { | ||
let index = T::Native::from_usize(r).unwrap(); | ||
builder.append_value(index); | ||
}); | ||
}; | ||
} | ||
|
||
builder.finish() | ||
} | ||
|
||
fn create_unnest_take_indices<T>( | ||
list_lengths: &PrimitiveArray<T>, | ||
capacity: usize, | ||
) -> PrimitiveArray<T> | ||
where | ||
T: ArrowPrimitiveType, | ||
{ | ||
let mut builder = PrimitiveArray::<T>::builder(capacity); | ||
let mut value_offset = 0; | ||
for row in 0..list_lengths.len() { | ||
if list_lengths.is_null(row) { | ||
builder.append_null(); | ||
} else { | ||
// Both `repeat` and `index` are positive intergers. | ||
let repeat = list_lengths.value(row).to_usize().unwrap(); | ||
(0..repeat).for_each(|r| { | ||
let index = T::Native::from_usize(r + value_offset).unwrap(); | ||
builder.append_value(index); | ||
}); | ||
value_offset += repeat; | ||
}; | ||
} | ||
|
||
builder.finish() | ||
} | ||
|
||
/// Create the final batch given the unnested column array and a `indices` array | ||
/// that is used by the take kernel to copy values. | ||
/// | ||
/// For example if we have the following `RecordBatch`: | ||
/// | ||
/// ```ignore | ||
/// c1: [1], null, [2, 3, 4], null, [5, 6] | ||
/// c2: 'a', 'b', 'c', null, 'd' | ||
/// c2: 'a', 'b', 'c', null, 'd' | ||
/// ``` | ||
/// | ||
/// then the `unnested_array` contains the unnest column that will replace `c1` in | ||
|
@@ -425,43 +462,61 @@ where | |
/// ```ignore | ||
/// 1, null, 2, 3, 4, null, 5, 6 | ||
/// ``` | ||
fn unnest_array<T>(list_array: &T) -> Result<Arc<dyn Array + 'static>> | ||
/// | ||
/// For [`DataType::FixedSizeList`] the values array will look like: | ||
/// | ||
/// ```ignore | ||
/// 1, 2, 3, null, null, null, 4, 5, 6 | ||
/// ``` | ||
/// | ||
/// While the other cases will omit nulls from the values array: | ||
/// | ||
/// ```ignore | ||
/// 1, 2, 3, 4, 5, 6 | ||
/// ``` | ||
/// Therefor we calculate take indices based on the underlying datatype. | ||
/// | ||
fn unnest_array<T, P>( | ||
list_array: &T, | ||
list_array_values: &Arc<dyn Array>, | ||
list_lengths: &PrimitiveArray<P>, | ||
) -> Result<Arc<dyn Array + 'static>> | ||
where | ||
T: ArrayAccessor<Item = ArrayRef>, | ||
P: ArrowPrimitiveType, | ||
{ | ||
let elem_type = match list_array.data_type() { | ||
DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => { | ||
f.data_type() | ||
} | ||
dt => { | ||
return Err(DataFusionError::Execution(format!( | ||
"Cannot unnest array of type {dt}" | ||
))) | ||
} | ||
}; | ||
|
||
let null_row = new_null_array(elem_type, 1); | ||
|
||
// Create a vec of ArrayRef from the list elements. | ||
let arrays = (0..list_array.len()) | ||
.map(|row| { | ||
if list_array.is_null(row) { | ||
null_row.clone() | ||
} else { | ||
list_array.value(row) | ||
if list_array.null_count() == 0 { | ||
Ok(list_array_values.clone()) | ||
} else { | ||
// For `FixedSizeList` the values array contains fixed length arrays, even if there are null values. | ||
// Therefor we need to calculate take indices accordingly. | ||
match list_array.data_type() { | ||
DataType::FixedSizeList(_, value_length) => { | ||
let take_indices = create_fixed_size_unnest_take_indices( | ||
list_lengths, | ||
*value_length as usize, | ||
); | ||
Ok(kernels::take::take(list_array_values, &take_indices, None)?) | ||
} | ||
}) | ||
.collect::<Vec<_>>(); | ||
|
||
// Create Vec<&dyn Array> from Vec<Arc<dyn Array>> for `concat`. Calling | ||
// `as_ref()` in the `map` above causes the borrow checker to complain. | ||
let arrays = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>(); | ||
|
||
Ok(kernels::concat::concat(&arrays)?) | ||
_ => { | ||
let capacity = list_array_values.len() + list_array.null_count(); | ||
let take_indices = create_unnest_take_indices(list_lengths, capacity); | ||
Ok(kernels::take::take(list_array_values, &take_indices, None)?) | ||
} | ||
} | ||
} | ||
} | ||
|
||
/// Returns an array with the lengths of each list in `list_array`. Returns null | ||
/// for a null value. | ||
// Create an array with the lengths of each list value in the nested array. | ||
// Given the nested array: | ||
// | ||
// [1], null, [2, 3, 4], null, [5, 6] | ||
// | ||
// the result array is: | ||
// | ||
// 1, null, 3, null, 2 | ||
// | ||
// Depending on the list type the result may be Int32Array or Int64Array. | ||
fn list_lengths<T>(list_array: &T) -> Result<Arc<dyn Array + 'static>> | ||
where | ||
T: ArrayAccessor<Item = ArrayRef>, | ||
|
@@ -489,3 +544,37 @@ where | |
))), | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
|
||
use super::*; | ||
|
||
#[test] | ||
fn calculate_unnest_take_indices() { | ||
let test_groups = PrimitiveArray::<Int32Type>::from(vec![ | ||
Some(3), | ||
None, | ||
Some(1), | ||
None, | ||
Some(2), | ||
None, | ||
]); | ||
|
||
let res = create_unnest_take_indices(&test_groups, 9); | ||
|
||
let expected = PrimitiveArray::<Int32Type>::from(vec![ | ||
Some(0), | ||
Some(1), | ||
Some(2), | ||
None, | ||
Some(3), | ||
None, | ||
Some(4), | ||
Some(5), | ||
None, | ||
]); | ||
|
||
assert_eq!(expected, res) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.values
seems to not have a common trait so it needs to be passed down here where we have the concrete typesThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this makes sense