Skip to content

Commit

Permalink
first draft
Browse files Browse the repository at this point in the history
Signed-off-by: jayzhan211 <[email protected]>
  • Loading branch information
jayzhan211 committed Jul 7, 2023
1 parent f26c540 commit 8f821d9
Show file tree
Hide file tree
Showing 2 changed files with 254 additions and 23 deletions.
8 changes: 8 additions & 0 deletions datafusion/core/tests/sqllogictests/test_files/array.slt
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,14 @@ select make_array(f0) from fixed_size_list_array
----
[[1, 2], [3, 4]]

query ?
select array_concat(column1, make_array(0)) from arrays_values_without_nulls;
----
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0]
[11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 0]
[21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 0]
[31, 32, 33, 34, 35, 26, 37, 38, 39, 40, 0]


### Delete tables

Expand Down
269 changes: 246 additions & 23 deletions datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,35 +373,184 @@ pub fn array_prepend(args: &[ArrayRef]) -> Result<ArrayRef> {
Ok(res)
}

macro_rules! concat_internal {
($args:expr, $DataType:ident, $ArrayType:ident) => {{
let list_arrays =
downcast_vec!($args, ListArray).collect::<Result<Vec<&ListArray>>>()?;

let column_len = $args[0].len();

let mut array_values =
vec![
downcast_arg!(new_empty_array(&DataType::$DataType), $ArrayType).clone();
column_len
];

let mut array_lens = vec![0; column_len];

for list_arr in list_arrays.iter() {
for (idx, (value, arr)) in
array_values.iter_mut().zip(list_arr.iter()).enumerate()
{
match arr {
Some(child_array) => {
// Skip null array
if child_array.as_any().downcast_ref::<NullArray>().is_some() {
continue;
}

let child_array = downcast_arg!(child_array, $ArrayType);
let concat_args = compute::concat(&[value, child_array])?;
*value = downcast_arg!(concat_args, $ArrayType).clone();
array_lens[idx] += child_array.len();
}
None => {
// TODO: Not support error
}
}
}
}

let offsets: Vec<i32> = std::iter::once(0)
.chain(array_lens.iter().scan(0, |state, &x| {
*state += x;
Some(*state as i32)
}))
.collect();

let values: Vec<&dyn Array> =
array_values.iter().map(|arr| arr as &dyn Array).collect();
let merged_values = compute::concat(values.as_slice())?;

let field = Arc::new(Field::new("item", DataType::$DataType, true));

Ok(Arc::new(ListArray::try_new(
field,
OffsetBuffer::new(offsets.into()),
Arc::new(merged_values),
None,
)?))
}};
}

fn concat_(args: &[ArrayRef]) -> Result<ArrayRef> {
// println!("args: {:?}", args);
let list_arrays =
downcast_vec!(args, ListArray).collect::<Result<Vec<&ListArray>>>()?;
// println!("list_arrays: {:?}", list_arrays);

let column_len = args[0].len();

let mut array_values =
vec![
downcast_arg!(new_empty_array(&DataType::Int64), Int64Array).clone();
column_len
];

let mut array_lens = vec![0; column_len];

for list_arr in list_arrays.iter() {
for (idx, (value, arr)) in
array_values.iter_mut().zip(list_arr.iter()).enumerate()
{
match arr {
Some(child_array) => {
// Skip null array
if child_array.as_any().downcast_ref::<NullArray>().is_some() {
continue;
}

let child_array = downcast_arg!(child_array, Int64Array);
let concat_args = compute::concat(&[value, child_array])?;
*value = downcast_arg!(concat_args, Int64Array).clone();
array_lens[idx] += child_array.len();
}
None => {
// TODO: Not support error
}
}
}
}

let offsets: Vec<i32> = std::iter::once(0)
.chain(array_lens.iter().scan(0, |state, &x| {
*state += x;
Some(*state as i32)
}))
.collect();

let values: Vec<&dyn Array> =
array_values.iter().map(|arr| arr as &dyn Array).collect();
let merged_values = compute::concat(values.as_slice())?;

let field = Arc::new(Field::new("item", DataType::Int64, true));

Ok(Arc::new(ListArray::try_new(
field,
OffsetBuffer::new(offsets.into()),
Arc::new(merged_values),
None,
)?))
}

fn old_concat(args: &[ArrayRef]) -> Result<ArrayRef> {
let list_arrays =
downcast_vec!(args, ListArray).collect::<Result<Vec<&ListArray>>>()?;
let len: usize = list_arrays.iter().map(|a| a.values().len()).sum();
let capacity = Capacities::Array(list_arrays.iter().map(|a| a.len()).sum());
let array_data: Vec<_> = list_arrays.iter().map(|a| a.to_data()).collect::<Vec<_>>();
let array_data = array_data.iter().collect();
let mut mutable = MutableArrayData::with_capacities(array_data, false, capacity);

for (i, a) in list_arrays.iter().enumerate() {
mutable.extend(i, 0, a.len())
}

let builder = mutable.into_builder();
let list = builder
.len(1)
.buffers(vec![Buffer::from_slice_ref([0, len as i32])])
.build()
.unwrap();
Ok(Arc::new(arrow::array::make_array(list)))

}

/// Array_concat/Array_cat SQL function
pub fn array_concat(args: &[ArrayRef]) -> Result<ArrayRef> {
match args[0].data_type() {
DataType::List(field) => match field.data_type() {
DataType::Null => array_concat(&args[1..]),
_ => {
let list_arrays = downcast_vec!(args, ListArray)
.collect::<Result<Vec<&ListArray>>>()?;
let len: usize = list_arrays.iter().map(|a| a.values().len()).sum();
let capacity =
Capacities::Array(list_arrays.iter().map(|a| a.len()).sum());
let array_data: Vec<_> =
list_arrays.iter().map(|a| a.to_data()).collect::<Vec<_>>();
let array_data = array_data.iter().collect();
let mut mutable =
MutableArrayData::with_capacities(array_data, false, capacity);

for (i, a) in list_arrays.iter().enumerate() {
mutable.extend(i, 0, a.len())
DataType::List(_) => {
old_concat(args)
}
data_type => {
// println!("data_type: {:?}", data_type);
// println!("args: {:?}", args);
// concat_(args)
// old_concat(args)

match data_type {
DataType::Int64 => concat_internal!(args, Int64, Int64Array),
DataType::Int32 => concat_internal!(args, Int32, Int32Array),
DataType::Int16 => concat_internal!(args, Int16, Int16Array),
DataType::Int8 => concat_internal!(args, Int8, Int8Array),
DataType::UInt64 => concat_internal!(args, UInt64, UInt64Array),
DataType::UInt32 => concat_internal!(args, UInt32, UInt32Array),
DataType::UInt16 => concat_internal!(args, UInt16, UInt16Array),
DataType::UInt8 => concat_internal!(args, UInt8, UInt8Array),
DataType::Float64 => concat_internal!(args, Float64, Float64Array),
DataType::Float32 => concat_internal!(args, Float32, Float32Array),
DataType::Boolean => concat_internal!(args, Boolean, BooleanArray),
DataType::Utf8 => concat_internal!(args, Utf8, StringArray),
DataType::LargeUtf8 => {
concat_internal!(args, LargeUtf8, LargeStringArray)
}
data_type => Err(DataFusionError::NotImplemented(format!(
"Array_concat is not implemented for type '{:?}'.",
data_type
))),
}

let builder = mutable.into_builder();
let list = builder
.len(1)
.buffers(vec![Buffer::from_slice_ref([0, len as i32])])
.build()
.unwrap();

return Ok(Arc::new(arrow::array::make_array(list)));
}
},
data_type => Err(DataFusionError::NotImplemented(format!(
Expand All @@ -410,6 +559,80 @@ pub fn array_concat(args: &[ArrayRef]) -> Result<ArrayRef> {
}
}

// [ListArray
// [
// PrimitiveArray<Int64>
// [
// 1,
// 2,
// 3,
// 4,
// 5,
// 6,
// 7,
// 8,
// 9,
// 10,
// ],
// PrimitiveArray<Int64>
// [
// 11,
// 12,
// 13,
// 14,
// 15,
// 16,
// 17,
// 18,
// 19,
// 20,
// ],
// PrimitiveArray<Int64>
// [
// 21,
// 22,
// 23,
// 24,
// 25,
// 26,
// 27,
// 28,
// 29,
// 30,
// ],
// PrimitiveArray<Int64>
// [
// 31,
// 32,
// 33,
// 34,
// 35,
// 26,
// 37,
// 38,
// 39,
// 40,
// ],
// ], ListArray
// [
// PrimitiveArray<Int64>
// [
// 0,
// ],
// PrimitiveArray<Int64>
// [
// 0,
// ],
// PrimitiveArray<Int64>
// [
// 0,
// ],
// PrimitiveArray<Int64>
// [
// 0,
// ],
// ]]

macro_rules! fill {
($ARRAY:expr, $ELEMENT:expr, $ARRAY_TYPE:ident) => {{
let arr = downcast_arg!($ARRAY, $ARRAY_TYPE);
Expand Down

0 comments on commit 8f821d9

Please sign in to comment.