From 95d9901c53ec38fa339fbae4d48c978199d98a05 Mon Sep 17 00:00:00 2001 From: smiklos Date: Mon, 17 Jul 2023 17:09:57 +0200 Subject: [PATCH 1/4] avoid copying listarray in unset exec --- datafusion/core/src/physical_plan/unnest.rs | 138 +++++++++++++------- 1 file changed, 89 insertions(+), 49 deletions(-) diff --git a/datafusion/core/src/physical_plan/unnest.rs b/datafusion/core/src/physical_plan/unnest.rs index 7a213dffeb51..ad280d9be486 100644 --- a/datafusion/core/src/physical_plan/unnest.rs +++ b/datafusion/core/src/physical_plan/unnest.rs @@ -18,8 +18,8 @@ //! Defines the unnest column plan for unnesting values in a column that contains a list //! type, conceptually is like joining each row with all the values in the list column. use arrow::array::{ - new_null_array, Array, ArrayAccessor, ArrayRef, ArrowPrimitiveType, - FixedSizeListArray, Int32Array, LargeListArray, ListArray, PrimitiveArray, + Array, ArrayAccessor, ArrayRef, ArrowPrimitiveType, FixedSizeListArray, Int32Array, + LargeListArray, ListArray, PrimitiveArray, }; use arrow::compute::kernels; use arrow::datatypes::{ @@ -236,21 +236,21 @@ fn build_batch( match list_array.data_type() { DataType::List(_) => { let list_array = list_array.as_any().downcast_ref::().unwrap(); - unnest_batch(batch, schema, column, &list_array) + unnest_batch(batch, schema, column, &list_array, list_array.values()) } DataType::LargeList(_) => { let list_array = list_array .as_any() .downcast_ref::() .unwrap(); - unnest_batch(batch, schema, column, &list_array) + unnest_batch(batch, schema, column, &list_array, list_array.values()) } DataType::FixedSizeList(_, _) => { let list_array = list_array .as_any() .downcast_ref::() .unwrap(); - unnest_batch(batch, schema, column, list_array) + unnest_batch(batch, schema, column, list_array, list_array.values()) } _ => Err(DataFusionError::Execution(format!( "Invalid unnest column {column}" @@ -263,31 +263,11 @@ fn unnest_batch( schema: &SchemaRef, column: &Column, list_array: &T, + list_array_values: &Arc, ) -> Result where T: ArrayAccessor, { - // Create an array with the unnested values of the list array, given the list - // array: - // - // [1], null, [2, 3, 4], null, [5, 6] - // - // the result array is: - // - // 1, null, 2, 3, 4, null, 5, 6 - // - let unnested_array = unnest_array(list_array)?; - - // Create an array with the lengths of each list value in the nested array. - // Given the nested array: - // - // [1], null, [2, 3, 4], null, [5, 6] - // - // the result array is: - // - // 1, null, 3, null, 2 - // - // Depending on the list type the result may be Int32Array or Int64Array. let list_lengths = list_lengths(list_array)?; // Create the indices for the take kernel and then use those indices to create @@ -295,11 +275,15 @@ where match list_lengths.data_type() { DataType::Int32 => { let list_lengths = as_primitive_array::(&list_lengths)?; + let unnested_array = + unnest_array(list_array, list_array_values, list_lengths)?; let indices = create_take_indices(list_lengths, unnested_array.len()); batch_from_indices(batch, schema, column.index(), &unnested_array, &indices) } DataType::Int64 => { let list_lengths = as_primitive_array::(&list_lengths)?; + let unnested_array = + unnest_array(list_array, list_array_values, list_lengths)?; let indices = create_take_indices(list_lengths, unnested_array.len()); batch_from_indices(batch, schema, column.index(), &unnested_array, &indices) } @@ -357,6 +341,32 @@ where builder.finish() } +fn create_unnest_take_indices( + list_lengths: &PrimitiveArray, + capacity: usize, +) -> PrimitiveArray +where + T: ArrowPrimitiveType, +{ + let mut builder = PrimitiveArray::::builder(capacity); + let mut value_offset = 0; + for row in 0..list_lengths.len() { + if list_lengths.is_null(row) { + builder.append_null(); + } else { + // Both `repeat` and `index` are positive intergers. + let repeat = list_lengths.value(row).to_usize().unwrap(); + (0..repeat).for_each(|r| { + let index = T::Native::from_usize(r + value_offset).unwrap(); + builder.append_value(index); + }); + value_offset += repeat; + }; + } + + builder.finish() +} + /// Create the final batch given the unnested column array and a `indices` array /// that is used by the take kernel to copy values. /// @@ -364,7 +374,7 @@ where /// /// ```ignore /// c1: [1], null, [2, 3, 4], null, [5, 6] -/// c2: 'a', 'b', 'c', null, 'd' +/// c2: 'a', 'b', 'c', null, 'd' /// ``` /// /// then the `unnested_array` contains the unnest column that will replace `c1` in @@ -425,11 +435,16 @@ where /// ```ignore /// 1, null, 2, 3, 4, null, 5, 6 /// ``` -fn unnest_array(list_array: &T) -> Result> +fn unnest_array( + list_array: &T, + list_array_values: &Arc, + list_lengths: &PrimitiveArray

, +) -> Result> where T: ArrayAccessor, + P: ArrowPrimitiveType, { - let elem_type = match list_array.data_type() { + let _elem_type = match list_array.data_type() { DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => { f.data_type() } @@ -440,28 +455,25 @@ where } }; - let null_row = new_null_array(elem_type, 1); - - // Create a vec of ArrayRef from the list elements. - let arrays = (0..list_array.len()) - .map(|row| { - if list_array.is_null(row) { - null_row.clone() - } else { - list_array.value(row) - } - }) - .collect::>(); - - // Create Vec<&dyn Array> from Vec> for `concat`. Calling - // `as_ref()` in the `map` above causes the borrow checker to complain. - let arrays = arrays.iter().map(|a| a.as_ref()).collect::>(); - - Ok(kernels::concat::concat(&arrays)?) + if list_array.null_count() > 0 { + let capacity = list_array_values.len() + list_array.null_count(); + let take_indices = create_unnest_take_indices(list_lengths, capacity); + Ok(kernels::take::take(list_array_values, &take_indices, None)?) + } else { + Ok(list_array_values.clone()) + } } -/// Returns an array with the lengths of each list in `list_array`. Returns null -/// for a null value. +// Create an array with the lengths of each list value in the nested array. +// Given the nested array: +// +// [1], null, [2, 3, 4], null, [5, 6] +// +// the result array is: +// +// 1, null, 3, null, 2 +// +// Depending on the list type the result may be Int32Array or Int64Array. fn list_lengths(list_array: &T) -> Result> where T: ArrayAccessor, @@ -489,3 +501,31 @@ where ))), } } + +#[test] +fn calculate_unnest_take_indices() { + let test_groups = PrimitiveArray::::from(vec![ + Some(3), + None, + Some(1), + None, + Some(2), + None, + ]); + + let res = create_unnest_take_indices(&test_groups, 9); + + let expected = PrimitiveArray::::from(vec![ + Some(0), + Some(1), + Some(2), + None, + Some(3), + None, + Some(4), + Some(5), + None, + ]); + + assert_eq!(expected, res) +} From 3d1cfb308fd1f7f644bcee7924da13cef9f4308c Mon Sep 17 00:00:00 2001 From: smiklos Date: Tue, 18 Jul 2023 12:20:03 +0200 Subject: [PATCH 2/4] remove redundant type validation --- datafusion/core/src/physical_plan/unnest.rs | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/datafusion/core/src/physical_plan/unnest.rs b/datafusion/core/src/physical_plan/unnest.rs index ad280d9be486..6d9ab397f7a6 100644 --- a/datafusion/core/src/physical_plan/unnest.rs +++ b/datafusion/core/src/physical_plan/unnest.rs @@ -444,17 +444,6 @@ where T: ArrayAccessor, P: ArrowPrimitiveType, { - let _elem_type = match list_array.data_type() { - DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => { - f.data_type() - } - dt => { - return Err(DataFusionError::Execution(format!( - "Cannot unnest array of type {dt}" - ))) - } - }; - if list_array.null_count() > 0 { let capacity = list_array_values.len() + list_array.null_count(); let take_indices = create_unnest_take_indices(list_lengths, capacity); From 2ec5fcca06dc7af74de99d4d55bfbb2bdf751b16 Mon Sep 17 00:00:00 2001 From: smiklos Date: Tue, 18 Jul 2023 22:28:04 +0200 Subject: [PATCH 3/4] special case for FixedSizeList --- datafusion/core/src/physical_plan/unnest.rs | 124 +++++++++++++++----- 1 file changed, 92 insertions(+), 32 deletions(-) diff --git a/datafusion/core/src/physical_plan/unnest.rs b/datafusion/core/src/physical_plan/unnest.rs index 6d9ab397f7a6..0871c075095e 100644 --- a/datafusion/core/src/physical_plan/unnest.rs +++ b/datafusion/core/src/physical_plan/unnest.rs @@ -341,6 +341,33 @@ where builder.finish() } +fn create_fixed_size_unnest_take_indices( + list_lengths: &PrimitiveArray, + value_length: usize, +) -> PrimitiveArray +where + T: ArrowPrimitiveType, +{ + let capacity = + value_length * list_lengths.len() - list_lengths.null_count() * value_length; + let mut builder = PrimitiveArray::::builder(capacity); + + for row in 0..list_lengths.len() { + if list_lengths.is_null(row) { + builder.append_null(); + } else { + let fixed_length_offset = row * value_length; + // Both `repeat` and `index` are positive intergers. + (fixed_length_offset..fixed_length_offset + value_length).for_each(|r| { + let index = T::Native::from_usize(r).unwrap(); + builder.append_value(index); + }); + }; + } + + builder.finish() +} + fn create_unnest_take_indices( list_lengths: &PrimitiveArray, capacity: usize, @@ -435,6 +462,20 @@ where /// ```ignore /// 1, null, 2, 3, 4, null, 5, 6 /// ``` +/// +/// For [`DataType::FixedSizeList`] the values array will look like: +/// +/// ```ignore +/// 1, 2, 3, null, null, null, 4, 5, 6 +/// ``` +/// +/// While the other cases will omit nulls from the values array: +/// +/// ```ignore +/// 1, 2, 3, 4, 5, 6 +/// ``` +/// Therefor we calculate take indices based on the underlying datatype. +/// fn unnest_array( list_array: &T, list_array_values: &Arc, @@ -444,12 +485,25 @@ where T: ArrayAccessor, P: ArrowPrimitiveType, { - if list_array.null_count() > 0 { - let capacity = list_array_values.len() + list_array.null_count(); - let take_indices = create_unnest_take_indices(list_lengths, capacity); - Ok(kernels::take::take(list_array_values, &take_indices, None)?) - } else { - Ok(list_array_values.clone()) + // For `FixedSizeList` the values array contains fixed length arrays, even if there are null values. + // Therefor we must use the take kernel with take indices purpose built for `FixedSizeList`. + match list_array.data_type() { + DataType::FixedSizeList(_, value_length) => { + let take_indices = create_fixed_size_unnest_take_indices( + list_lengths, + *value_length as usize, + ); + Ok(kernels::take::take(list_array_values, &take_indices, None)?) + } + _ => { + if list_array.null_count() > 0 { + let capacity = list_array_values.len() + list_array.null_count(); + let take_indices = create_unnest_take_indices(list_lengths, capacity); + Ok(kernels::take::take(list_array_values, &take_indices, None)?) + } else { + Ok(list_array_values.clone()) + } + } } } @@ -491,30 +545,36 @@ where } } -#[test] -fn calculate_unnest_take_indices() { - let test_groups = PrimitiveArray::::from(vec![ - Some(3), - None, - Some(1), - None, - Some(2), - None, - ]); - - let res = create_unnest_take_indices(&test_groups, 9); - - let expected = PrimitiveArray::::from(vec![ - Some(0), - Some(1), - Some(2), - None, - Some(3), - None, - Some(4), - Some(5), - None, - ]); - - assert_eq!(expected, res) +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn calculate_unnest_take_indices() { + let test_groups = PrimitiveArray::::from(vec![ + Some(3), + None, + Some(1), + None, + Some(2), + None, + ]); + + let res = create_unnest_take_indices(&test_groups, 9); + + let expected = PrimitiveArray::::from(vec![ + Some(0), + Some(1), + Some(2), + None, + Some(3), + None, + Some(4), + Some(5), + None, + ]); + + assert_eq!(expected, res) + } } From b8c29eab7def0e1d28e8c6f60530c946b66cf720 Mon Sep 17 00:00:00 2001 From: smiklos Date: Tue, 18 Jul 2023 23:22:29 +0200 Subject: [PATCH 4/4] optimization and test --- datafusion/core/src/physical_plan/unnest.rs | 28 ++++----- datafusion/core/tests/dataframe/mod.rs | 66 +++++++++++++++++++++ 2 files changed, 80 insertions(+), 14 deletions(-) diff --git a/datafusion/core/src/physical_plan/unnest.rs b/datafusion/core/src/physical_plan/unnest.rs index 0871c075095e..1098f94b3eda 100644 --- a/datafusion/core/src/physical_plan/unnest.rs +++ b/datafusion/core/src/physical_plan/unnest.rs @@ -485,23 +485,23 @@ where T: ArrayAccessor, P: ArrowPrimitiveType, { - // For `FixedSizeList` the values array contains fixed length arrays, even if there are null values. - // Therefor we must use the take kernel with take indices purpose built for `FixedSizeList`. - match list_array.data_type() { - DataType::FixedSizeList(_, value_length) => { - let take_indices = create_fixed_size_unnest_take_indices( - list_lengths, - *value_length as usize, - ); - Ok(kernels::take::take(list_array_values, &take_indices, None)?) - } - _ => { - if list_array.null_count() > 0 { + if list_array.null_count() == 0 { + Ok(list_array_values.clone()) + } else { + // For `FixedSizeList` the values array contains fixed length arrays, even if there are null values. + // Therefor we need to calculate take indices accordingly. + match list_array.data_type() { + DataType::FixedSizeList(_, value_length) => { + let take_indices = create_fixed_size_unnest_take_indices( + list_lengths, + *value_length as usize, + ); + Ok(kernels::take::take(list_array_values, &take_indices, None)?) + } + _ => { let capacity = list_array_values.len() + list_array.null_count(); let take_indices = create_unnest_take_indices(list_lengths, capacity); Ok(kernels::take::take(list_array_values, &take_indices, None)?) - } else { - Ok(list_array_values.clone()) } } } diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index aecec35f2e16..21f62a63c16c 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -1114,6 +1114,72 @@ async fn unnest_fixed_list() -> Result<()> { Ok(()) } +#[tokio::test] +async fn unnest_fixed_list_nonull() -> Result<()> { + let mut shape_id_builder = UInt32Builder::new(); + let mut tags_builder = FixedSizeListBuilder::new(StringBuilder::new(), 2); + + for idx in 0..6 { + // Append shape id. + shape_id_builder.append_value(idx as u32 + 1); + + tags_builder + .values() + .append_value(format!("tag{}1", idx + 1)); + tags_builder + .values() + .append_value(format!("tag{}2", idx + 1)); + tags_builder.append(true); + } + + let batch = RecordBatch::try_from_iter(vec![ + ("shape_id", Arc::new(shape_id_builder.finish()) as ArrayRef), + ("tags", Arc::new(tags_builder.finish()) as ArrayRef), + ])?; + + let ctx = SessionContext::new(); + ctx.register_batch("shapes", batch)?; + let df = ctx.table("shapes").await?; + + let results = df.clone().collect().await?; + let expected = vec![ + "+----------+----------------+", + "| shape_id | tags |", + "+----------+----------------+", + "| 1 | [tag11, tag12] |", + "| 2 | [tag21, tag22] |", + "| 3 | [tag31, tag32] |", + "| 4 | [tag41, tag42] |", + "| 5 | [tag51, tag52] |", + "| 6 | [tag61, tag62] |", + "+----------+----------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + let results = df.unnest_column("tags")?.collect().await?; + let expected = vec![ + "+----------+-------+", + "| shape_id | tags |", + "+----------+-------+", + "| 1 | tag11 |", + "| 1 | tag12 |", + "| 2 | tag21 |", + "| 2 | tag22 |", + "| 3 | tag31 |", + "| 3 | tag32 |", + "| 4 | tag41 |", + "| 4 | tag42 |", + "| 5 | tag51 |", + "| 5 | tag52 |", + "| 6 | tag61 |", + "| 6 | tag62 |", + "+----------+-------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} + #[tokio::test] async fn unnest_aggregate_columns() -> Result<()> { const NUM_ROWS: usize = 5;