Skip to content

Commit

Permalink
replace pop front and pop back
Browse files Browse the repository at this point in the history
Signed-off-by: jayzhan211 <[email protected]>
  • Loading branch information
jayzhan211 committed Dec 6, 2023
1 parent cc1d81d commit fc9b23f
Showing 1 changed file with 20 additions and 165 deletions.
185 changes: 20 additions & 165 deletions datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,131 +370,6 @@ pub fn make_array(arrays: &[ArrayRef]) -> Result<ArrayRef> {
}
}

fn return_empty(return_null: bool, data_type: DataType) -> Arc<dyn Array> {
if return_null {
new_null_array(&data_type, 1)
} else {
new_empty_array(&data_type)
}
}

fn list_slice<T: Array + 'static>(
array: &dyn Array,
i: i64,
j: i64,
return_element: bool,
) -> ArrayRef {
let array = array.as_any().downcast_ref::<T>().unwrap();

let array_type = array.data_type().clone();

if i == 0 && j == 0 || array.is_empty() {
return return_empty(return_element, array_type);
}

let i = match i.cmp(&0) {
Ordering::Less => {
if i.unsigned_abs() > array.len() as u64 {
return return_empty(true, array_type);
}

(array.len() as i64 + i + 1) as usize
}
Ordering::Equal => 1,
Ordering::Greater => i as usize,
};

let j = match j.cmp(&0) {
Ordering::Less => {
if j.unsigned_abs() as usize > array.len() {
return return_empty(true, array_type);
}
if return_element {
(array.len() as i64 + j + 1) as usize
} else {
(array.len() as i64 + j) as usize
}
}
Ordering::Equal => 1,
Ordering::Greater => j.min(array.len() as i64) as usize,
};

if i > j || i > array.len() {
return_empty(return_element, array_type)
} else {
Arc::new(array.slice(i - 1, j + 1 - i))
}
}

fn slice<T: Array + 'static>(
array: &ListArray,
key: &Int64Array,
extra_key: &Int64Array,
return_element: bool,
) -> Result<Arc<dyn Array>> {
let sliced_array: Vec<Arc<dyn Array>> = array
.iter()
.zip(key.iter())
.zip(extra_key.iter())
.map(|((arr, i), j)| match (arr, i, j) {
(Some(arr), Some(i), Some(j)) => list_slice::<T>(&arr, i, j, return_element),
(Some(arr), None, Some(j)) => list_slice::<T>(&arr, 1i64, j, return_element),
(Some(arr), Some(i), None) => {
list_slice::<T>(&arr, i, arr.len() as i64, return_element)
}
(Some(arr), None, None) if !return_element => arr.clone(),
_ => return_empty(return_element, array.value_type()),
})
.collect();

// concat requires input of at least one array
if sliced_array.is_empty() {
Ok(return_empty(return_element, array.value_type()))
} else {
let vec = sliced_array
.iter()
.map(|a| a.as_ref())
.collect::<Vec<&dyn Array>>();
let mut i: i32 = 0;
let mut offsets = vec![i];
offsets.extend(
vec.iter()
.map(|a| {
i += a.len() as i32;
i
})
.collect::<Vec<_>>(),
);
let values = compute::concat(vec.as_slice()).unwrap();

if return_element {
Ok(values)
} else {
let field = Arc::new(Field::new("item", array.value_type(), true));
Ok(Arc::new(ListArray::try_new(
field,
OffsetBuffer::new(offsets.into()),
values,
None,
)?))
}
}
}

fn define_array_slice(
list_array: &ListArray,
key: &Int64Array,
extra_key: &Int64Array,
return_element: bool,
) -> Result<ArrayRef> {
macro_rules! array_function {
($ARRAY_TYPE:ident) => {
slice::<$ARRAY_TYPE>(list_array, key, extra_key, return_element)
};
}
call_array_function!(list_array.value_type(), true)
}

pub fn array_element(args: &[ArrayRef]) -> Result<ArrayRef> {
let list_array = as_list_array(&args[0])?;
let indexes = as_int64_array(&args[1])?;
Expand Down Expand Up @@ -728,40 +603,18 @@ pub fn array_slice(args: &[ArrayRef]) -> Result<ArrayRef> {
)?))
}

fn general_array_pop(
list_array: &GenericListArray<i32>,
from_back: bool,
) -> Result<(Vec<i64>, Vec<i64>)> {
if from_back {
let key = vec![0; list_array.len()];
// Attention: `arr.len() - 1` in extra key defines the last element position (position = index + 1, not inclusive) we want in the new array.
let extra_key: Vec<_> = list_array
.iter()
.map(|x| x.map_or(0, |arr| arr.len() as i64 - 1))
.collect();
Ok((key, extra_key))
} else {
// Attention: 2 in the `key`` defines the first element position (position = index + 1) we want in the new array.
// We only handle two cases of the first element index: if the old array has any elements, starts from 2 (index + 1), or starts from initial.
let key: Vec<_> = list_array.iter().map(|x| x.map_or(0, |_| 2)).collect();
let extra_key: Vec<_> = list_array
.iter()
.map(|x| x.map_or(0, |arr| arr.len() as i64))
.collect();
Ok((key, extra_key))
}
}

/// array_pop_back SQL function
pub fn array_pop_back(args: &[ArrayRef]) -> Result<ArrayRef> {
let list_array = as_list_array(&args[0])?;
let (key, extra_key) = general_array_pop(list_array, true)?;

define_array_slice(
list_array,
&Int64Array::from(key),
&Int64Array::from(extra_key),
false,
)
let from_array = Int64Array::from(vec![1; list_array.len()]);
let to_array = Int64Array::from(
list_array
.iter()
.map(|arr| arr.map_or(0, |arr| arr.len() as i64 - 1))
.collect::<Vec<i64>>(),
);
let args = vec![args[0].clone(), Arc::new(from_array), Arc::new(to_array)];
array_slice(args.as_slice())
}

/// Appends or prepends elements to a ListArray.
Expand Down Expand Up @@ -885,16 +738,18 @@ pub fn gen_range(args: &[ArrayRef]) -> Result<ArrayRef> {
Ok(arr)
}

/// array_pop_front SQL function
pub fn array_pop_front(args: &[ArrayRef]) -> Result<ArrayRef> {
let list_array = as_list_array(&args[0])?;
let (key, extra_key) = general_array_pop(list_array, false)?;

define_array_slice(
list_array,
&Int64Array::from(key),
&Int64Array::from(extra_key),
false,
)
let from_array = Int64Array::from(vec![2; list_array.len()]);
let to_array = Int64Array::from(
list_array
.iter()
.map(|arr| arr.map_or(0, |arr| arr.len() as i64))
.collect::<Vec<i64>>(),
);
let args = vec![args[0].clone(), Arc::new(from_array), Arc::new(to_array)];
array_slice(args.as_slice())
}

/// Array_append SQL function
Expand Down

0 comments on commit fc9b23f

Please sign in to comment.