Skip to content

Commit

Permalink
Add ListArray Constructors (#3879) (#4065)
Browse files Browse the repository at this point in the history
* Add ListArray constructors (#3879)

* More cleanup

* Checked arithmetic

* Add try_new

* Add tests

* Clippy

* Update arrow-array/src/array/list_array.rs

Co-authored-by: Andrew Lamb <[email protected]>

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
tustvold and alamb authored Apr 16, 2023
1 parent 682231c commit 472c977
Show file tree
Hide file tree
Showing 2 changed files with 204 additions and 29 deletions.
197 changes: 171 additions & 26 deletions arrow-array/src/array/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
use crate::array::{get_offsets, make_array, print_long_array};
use crate::builder::{GenericListBuilder, PrimitiveBuilder};
use crate::{
iterator::GenericListArrayIter, Array, ArrayAccessor, ArrayRef, ArrowPrimitiveType,
iterator::GenericListArrayIter, new_empty_array, Array, ArrayAccessor, ArrayRef,
ArrowPrimitiveType,
};
use arrow_buffer::{ArrowNativeType, NullBuffer, OffsetBuffer};
use arrow_data::{ArrayData, ArrayDataBuilder};
use arrow_schema::{ArrowError, DataType, Field};
use arrow_schema::{ArrowError, DataType, FieldRef};
use num::Integer;
use std::any::Any;
use std::sync::Arc;
Expand Down Expand Up @@ -73,13 +74,114 @@ impl<OffsetSize: OffsetSizeTrait> GenericListArray<OffsetSize> {
/// The data type constructor of list array.
/// The input is the schema of the child array and
/// the output is the [`DataType`], List or LargeList.
pub const DATA_TYPE_CONSTRUCTOR: fn(Arc<Field>) -> DataType = if OffsetSize::IS_LARGE
{
pub const DATA_TYPE_CONSTRUCTOR: fn(FieldRef) -> DataType = if OffsetSize::IS_LARGE {
DataType::LargeList
} else {
DataType::List
};

/// Create a new [`GenericListArray`] from the provided parts
///
/// # Errors
///
/// Errors if
///
/// * `offsets.len() - 1 != nulls.len()`
/// * `offsets.last() > values.len()`
/// * `!field.is_nullable() && values.null_count() != 0`
pub fn try_new(
field: FieldRef,
offsets: OffsetBuffer<OffsetSize>,
values: ArrayRef,
nulls: Option<NullBuffer>,
) -> Result<Self, ArrowError> {
let len = offsets.len() - 1; // Offsets guaranteed to not be empty
let end_offset = offsets.last().unwrap().as_usize();
// don't need to check other values of `offsets` because they are checked
// during construction of `OffsetsbBuffer`
if end_offset > values.len() {
return Err(ArrowError::InvalidArgumentError(format!(
"Max offset of {end_offset} exceeds length of values {}",
values.len()
)));
}

if let Some(n) = nulls.as_ref() {
if n.len() != len {
return Err(ArrowError::InvalidArgumentError(format!(
"Incorrect number of nulls for {}ListArray, expected {len} got {}",
OffsetSize::PREFIX,
n.len(),
)));
}
}
if !field.is_nullable() && values.null_count() != 0 {
return Err(ArrowError::InvalidArgumentError(format!(
"Non-nullable field of {}ListArray {:?} cannot contain nulls",
OffsetSize::PREFIX,
field.name()
)));
}

if field.data_type() != values.data_type() {
return Err(ArrowError::InvalidArgumentError(format!(
"{}ListArray expected data type {} got {} for {:?}",
OffsetSize::PREFIX,
field.data_type(),
values.data_type(),
field.name()
)));
}

Ok(Self {
data_type: Self::DATA_TYPE_CONSTRUCTOR(field),
nulls,
values,
value_offsets: offsets,
})
}

/// Create a new [`GenericListArray`] from the provided parts
///
/// # Panics
///
/// Panics if [`Self::try_new`] returns an error
pub fn new(
field: FieldRef,
offsets: OffsetBuffer<OffsetSize>,
values: ArrayRef,
nulls: Option<NullBuffer>,
) -> Self {
Self::try_new(field, offsets, values, nulls).unwrap()
}

/// Create a new [`GenericListArray`] of length `len` where all values are null
pub fn new_null(field: FieldRef, len: usize) -> Self {
let values = new_empty_array(field.data_type());
Self {
data_type: Self::DATA_TYPE_CONSTRUCTOR(field),
nulls: Some(NullBuffer::new_null(len)),
value_offsets: OffsetBuffer::new_zeroed(len),
values,
}
}

/// Deconstruct this array into its constituent parts
pub fn into_parts(
self,
) -> (
FieldRef,
OffsetBuffer<OffsetSize>,
ArrayRef,
Option<NullBuffer>,
) {
let f = match self.data_type {
DataType::List(f) | DataType::LargeList(f) => f,
_ => unreachable!(),
};
(f, self.value_offsets, self.values, self.nulls)
}

/// Returns a reference to the offsets of this list
///
/// Unlike [`Self::value_offsets`] this returns the [`OffsetBuffer`]
Expand Down Expand Up @@ -405,31 +507,16 @@ mod tests {
use super::*;
use crate::builder::{Int32Builder, ListBuilder};
use crate::types::Int32Type;
use crate::Int32Array;
use arrow_buffer::{bit_util, Buffer, ToByteSlice};
use crate::{Int32Array, Int64Array};
use arrow_buffer::{bit_util, Buffer, ScalarBuffer};
use arrow_schema::Field;

fn create_from_buffers() -> ListArray {
// Construct a value array
let value_data = ArrayData::builder(DataType::Int32)
.len(8)
.add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice()))
.build()
.unwrap();

// Construct a buffer for value offsets, for the nested array:
// [[0, 1, 2], [3, 4, 5], [6, 7]]
let value_offsets = Buffer::from(&[0, 3, 6, 8].to_byte_slice());

// Construct a list array from the above two
let list_data_type =
DataType::List(Arc::new(Field::new("item", DataType::Int32, true)));
let list_data = ArrayData::builder(list_data_type)
.len(3)
.add_buffer(value_offsets)
.add_child_data(value_data)
.build()
.unwrap();
ListArray::from(list_data)
let values = Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7]);
let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 3, 6, 8]));
let field = Arc::new(Field::new("item", DataType::Int32, true));
ListArray::new(field, offsets, Arc::new(values), None)
}

#[test]
Expand Down Expand Up @@ -1029,4 +1116,62 @@ mod tests {
assert_eq!(string.len(), 0);
assert_eq!(string.value_offsets(), &[0]);
}

#[test]
fn test_try_new() {
let offsets = OffsetBuffer::new(vec![0, 1, 4, 5].into());
let values = Int32Array::new(DataType::Int32, vec![1, 2, 3, 4, 5].into(), None);
let values = Arc::new(values) as ArrayRef;

let field = Arc::new(Field::new("element", DataType::Int32, false));
ListArray::new(field.clone(), offsets.clone(), values.clone(), None);

let nulls = NullBuffer::new_null(3);
ListArray::new(field.clone(), offsets, values.clone(), Some(nulls));

let nulls = NullBuffer::new_null(3);
let offsets = OffsetBuffer::new(vec![0, 1, 2, 4, 5].into());
let err =
LargeListArray::try_new(field, offsets.clone(), values.clone(), Some(nulls))
.unwrap_err();

assert_eq!(
err.to_string(),
"Invalid argument error: Incorrect number of nulls for LargeListArray, expected 4 got 3"
);

let field = Arc::new(Field::new("element", DataType::Int64, false));
let err =
LargeListArray::try_new(field.clone(), offsets.clone(), values.clone(), None)
.unwrap_err();

assert_eq!(
err.to_string(),
"Invalid argument error: LargeListArray expected data type Int64 got Int32 for \"element\""
);

let nulls = NullBuffer::new_null(7);
let values = Int64Array::new(DataType::Int64, vec![0; 7].into(), Some(nulls));
let values = Arc::new(values);

let err = LargeListArray::try_new(field, offsets.clone(), values.clone(), None)
.unwrap_err();

assert_eq!(
err.to_string(),
"Invalid argument error: Non-nullable field of LargeListArray \"element\" cannot contain nulls"
);

let field = Arc::new(Field::new("element", DataType::Int64, true));
LargeListArray::new(field.clone(), offsets.clone(), values, None);

let values = Int64Array::new(DataType::Int64, vec![0; 2].into(), None);
let err =
LargeListArray::try_new(field, offsets, Arc::new(values), None).unwrap_err();

assert_eq!(
err.to_string(),
"Invalid argument error: Max offset of 5 exceeds length of values 2"
);
}
}
36 changes: 33 additions & 3 deletions arrow-buffer/src/buffer/offset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@ impl<O: ArrowNativeType> OffsetBuffer<O> {
/// # Panics
///
/// Panics if `buffer` is not a non-empty buffer containing
/// monotonically increasing values greater than zero
/// monotonically increasing values greater than or equal to zero
pub fn new(buffer: ScalarBuffer<O>) -> Self {
assert!(!buffer.is_empty(), "offsets cannot be empty");
assert!(buffer[0] > O::usize_as(0), "offsets must be greater than 0");
assert!(
buffer[0] >= O::usize_as(0),
"offsets must be greater than 0"
);
assert!(
buffer.windows(2).all(|w| w[0] <= w[1]),
"offsets must be monotonically increasing"
Expand All @@ -45,7 +48,7 @@ impl<O: ArrowNativeType> OffsetBuffer<O> {
/// # Safety
///
/// `buffer` must be a non-empty buffer containing monotonically increasing
/// values greater than zero
/// values greater than or equal to zero
pub unsafe fn new_unchecked(buffer: ScalarBuffer<O>) -> Self {
Self(buffer)
}
Expand All @@ -56,6 +59,16 @@ impl<O: ArrowNativeType> OffsetBuffer<O> {
Self(buffer.into_buffer().into())
}

/// Create a new [`OffsetBuffer`] containing `len + 1` `0` values
pub fn new_zeroed(len: usize) -> Self {
let len_bytes = len
.checked_add(1)
.and_then(|o| o.checked_mul(std::mem::size_of::<O>()))
.expect("overflow");
let buffer = MutableBuffer::from_len_zeroed(len_bytes);
Self(buffer.into_buffer().into())
}

/// Returns the inner [`ScalarBuffer`]
pub fn inner(&self) -> &ScalarBuffer<O> {
&self.0
Expand Down Expand Up @@ -104,6 +117,23 @@ mod tests {
OffsetBuffer::new(vec![-1, 0, 1].into());
}

#[test]
fn offsets() {
OffsetBuffer::new(vec![0, 1, 2, 3].into());

let offsets = OffsetBuffer::<i32>::new_zeroed(3);
assert_eq!(offsets.as_ref(), &[0; 4]);

let offsets = OffsetBuffer::<i32>::new_zeroed(0);
assert_eq!(offsets.as_ref(), &[0; 1]);
}

#[test]
#[should_panic(expected = "overflow")]
fn offsets_new_zeroed_overflow() {
OffsetBuffer::<i32>::new_zeroed(usize::MAX);
}

#[test]
#[should_panic(expected = "offsets must be monotonically increasing")]
fn non_monotonic_offsets() {
Expand Down

0 comments on commit 472c977

Please sign in to comment.