diff --git a/datafusion/core/tests/sqllogictests/test_files/array.slt b/datafusion/core/tests/sqllogictests/test_files/array.slt index 7eebb23d9cc9c..8411578322663 100644 --- a/datafusion/core/tests/sqllogictests/test_files/array.slt +++ b/datafusion/core/tests/sqllogictests/test_files/array.slt @@ -714,6 +714,14 @@ select make_array(f0) from fixed_size_list_array ---- [[1, 2], [3, 4]] +query ? +select array_concat(column1, make_array(0)) from arrays_values_without_nulls; +---- +[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0] +[11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 0] +[21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 0] +[31, 32, 33, 34, 35, 26, 37, 38, 39, 40, 0] + ### Delete tables diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index cd174918db379..b975922da8336 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -373,35 +373,184 @@ pub fn array_prepend(args: &[ArrayRef]) -> Result { Ok(res) } +macro_rules! concat_internal { + ($args:expr, $DataType:ident, $ArrayType:ident) => {{ + let list_arrays = + downcast_vec!($args, ListArray).collect::>>()?; + + let column_len = $args[0].len(); + + let mut array_values = + vec![ + downcast_arg!(new_empty_array(&DataType::$DataType), $ArrayType).clone(); + column_len + ]; + + let mut array_lens = vec![0; column_len]; + + for list_arr in list_arrays.iter() { + for (idx, (value, arr)) in + array_values.iter_mut().zip(list_arr.iter()).enumerate() + { + match arr { + Some(child_array) => { + // Skip null array + if child_array.as_any().downcast_ref::().is_some() { + continue; + } + + let child_array = downcast_arg!(child_array, $ArrayType); + let concat_args = compute::concat(&[value, child_array])?; + *value = downcast_arg!(concat_args, $ArrayType).clone(); + array_lens[idx] += child_array.len(); + } + None => { + // TODO: Not support error + } + } + } + } + + let offsets: Vec = std::iter::once(0) + .chain(array_lens.iter().scan(0, |state, &x| { + *state += x; + Some(*state as i32) + })) + .collect(); + + let values: Vec<&dyn Array> = + array_values.iter().map(|arr| arr as &dyn Array).collect(); + let merged_values = compute::concat(values.as_slice())?; + + let field = Arc::new(Field::new("item", DataType::$DataType, true)); + + Ok(Arc::new(ListArray::try_new( + field, + OffsetBuffer::new(offsets.into()), + Arc::new(merged_values), + None, + )?)) + }}; +} + +fn concat_(args: &[ArrayRef]) -> Result { + // println!("args: {:?}", args); + let list_arrays = + downcast_vec!(args, ListArray).collect::>>()?; + // println!("list_arrays: {:?}", list_arrays); + + let column_len = args[0].len(); + + let mut array_values = + vec![ + downcast_arg!(new_empty_array(&DataType::Int64), Int64Array).clone(); + column_len + ]; + + let mut array_lens = vec![0; column_len]; + + for list_arr in list_arrays.iter() { + for (idx, (value, arr)) in + array_values.iter_mut().zip(list_arr.iter()).enumerate() + { + match arr { + Some(child_array) => { + // Skip null array + if child_array.as_any().downcast_ref::().is_some() { + continue; + } + + let child_array = downcast_arg!(child_array, Int64Array); + let concat_args = compute::concat(&[value, child_array])?; + *value = downcast_arg!(concat_args, Int64Array).clone(); + array_lens[idx] += child_array.len(); + } + None => { + // TODO: Not support error + } + } + } + } + + let offsets: Vec = std::iter::once(0) + .chain(array_lens.iter().scan(0, |state, &x| { + *state += x; + Some(*state as i32) + })) + .collect(); + + let values: Vec<&dyn Array> = + array_values.iter().map(|arr| arr as &dyn Array).collect(); + let merged_values = compute::concat(values.as_slice())?; + + let field = Arc::new(Field::new("item", DataType::Int64, true)); + + Ok(Arc::new(ListArray::try_new( + field, + OffsetBuffer::new(offsets.into()), + Arc::new(merged_values), + None, + )?)) +} + +fn old_concat(args: &[ArrayRef]) -> Result { + let list_arrays = + downcast_vec!(args, ListArray).collect::>>()?; + let len: usize = list_arrays.iter().map(|a| a.values().len()).sum(); + let capacity = Capacities::Array(list_arrays.iter().map(|a| a.len()).sum()); + let array_data: Vec<_> = list_arrays.iter().map(|a| a.to_data()).collect::>(); + let array_data = array_data.iter().collect(); + let mut mutable = MutableArrayData::with_capacities(array_data, false, capacity); + + for (i, a) in list_arrays.iter().enumerate() { + mutable.extend(i, 0, a.len()) + } + + let builder = mutable.into_builder(); + let list = builder + .len(1) + .buffers(vec![Buffer::from_slice_ref([0, len as i32])]) + .build() + .unwrap(); + Ok(Arc::new(arrow::array::make_array(list))) + +} + /// Array_concat/Array_cat SQL function pub fn array_concat(args: &[ArrayRef]) -> Result { match args[0].data_type() { DataType::List(field) => match field.data_type() { DataType::Null => array_concat(&args[1..]), - _ => { - let list_arrays = downcast_vec!(args, ListArray) - .collect::>>()?; - let len: usize = list_arrays.iter().map(|a| a.values().len()).sum(); - let capacity = - Capacities::Array(list_arrays.iter().map(|a| a.len()).sum()); - let array_data: Vec<_> = - list_arrays.iter().map(|a| a.to_data()).collect::>(); - let array_data = array_data.iter().collect(); - let mut mutable = - MutableArrayData::with_capacities(array_data, false, capacity); - - for (i, a) in list_arrays.iter().enumerate() { - mutable.extend(i, 0, a.len()) + DataType::List(_) => { + old_concat(args) + } + data_type => { + // println!("data_type: {:?}", data_type); + // println!("args: {:?}", args); + // concat_(args) + // old_concat(args) + + match data_type { + DataType::Int64 => concat_internal!(args, Int64, Int64Array), + DataType::Int32 => concat_internal!(args, Int32, Int32Array), + DataType::Int16 => concat_internal!(args, Int16, Int16Array), + DataType::Int8 => concat_internal!(args, Int8, Int8Array), + DataType::UInt64 => concat_internal!(args, UInt64, UInt64Array), + DataType::UInt32 => concat_internal!(args, UInt32, UInt32Array), + DataType::UInt16 => concat_internal!(args, UInt16, UInt16Array), + DataType::UInt8 => concat_internal!(args, UInt8, UInt8Array), + DataType::Float64 => concat_internal!(args, Float64, Float64Array), + DataType::Float32 => concat_internal!(args, Float32, Float32Array), + DataType::Boolean => concat_internal!(args, Boolean, BooleanArray), + DataType::Utf8 => concat_internal!(args, Utf8, StringArray), + DataType::LargeUtf8 => { + concat_internal!(args, LargeUtf8, LargeStringArray) + } + data_type => Err(DataFusionError::NotImplemented(format!( + "Array_concat is not implemented for type '{:?}'.", + data_type + ))), } - - let builder = mutable.into_builder(); - let list = builder - .len(1) - .buffers(vec![Buffer::from_slice_ref([0, len as i32])]) - .build() - .unwrap(); - - return Ok(Arc::new(arrow::array::make_array(list))); } }, data_type => Err(DataFusionError::NotImplemented(format!( @@ -410,6 +559,80 @@ pub fn array_concat(args: &[ArrayRef]) -> Result { } } +// [ListArray +// [ +// PrimitiveArray +// [ +// 1, +// 2, +// 3, +// 4, +// 5, +// 6, +// 7, +// 8, +// 9, +// 10, +// ], +// PrimitiveArray +// [ +// 11, +// 12, +// 13, +// 14, +// 15, +// 16, +// 17, +// 18, +// 19, +// 20, +// ], +// PrimitiveArray +// [ +// 21, +// 22, +// 23, +// 24, +// 25, +// 26, +// 27, +// 28, +// 29, +// 30, +// ], +// PrimitiveArray +// [ +// 31, +// 32, +// 33, +// 34, +// 35, +// 26, +// 37, +// 38, +// 39, +// 40, +// ], +// ], ListArray +// [ +// PrimitiveArray +// [ +// 0, +// ], +// PrimitiveArray +// [ +// 0, +// ], +// PrimitiveArray +// [ +// 0, +// ], +// PrimitiveArray +// [ +// 0, +// ], +// ]] + macro_rules! fill { ($ARRAY:expr, $ELEMENT:expr, $ARRAY_TYPE:ident) => {{ let arr = downcast_arg!($ARRAY, $ARRAY_TYPE);