Skip to content

Commit

Permalink
Flesh out NullBuffer abstraction (apache#3880) (apache#3885)
Browse files Browse the repository at this point in the history
* Flesh out NullBuffer abstraction (apache#3880)

* Review feedback
  • Loading branch information
tustvold authored and spebern committed Mar 25, 2023
1 parent 2ae9cf7 commit f59c889
Show file tree
Hide file tree
Showing 18 changed files with 170 additions and 144 deletions.
72 changes: 21 additions & 51 deletions arrow-arith/src/arity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,25 @@ use arrow_array::builder::BufferBuilder;
use arrow_array::iterator::ArrayIter;
use arrow_array::types::ArrowDictionaryKeyType;
use arrow_array::*;
use arrow_buffer::buffer::{BooleanBuffer, NullBuffer};
use arrow_buffer::buffer::NullBuffer;
use arrow_buffer::{Buffer, MutableBuffer};
use arrow_data::bit_iterator::try_for_each_valid_idx;
use arrow_data::bit_mask::combine_option_bitmap;
use arrow_data::ArrayData;
use arrow_data::{ArrayData, ArrayDataBuilder};
use arrow_schema::ArrowError;
use std::sync::Arc;

#[inline]
unsafe fn build_primitive_array<O: ArrowPrimitiveType>(
len: usize,
buffer: Buffer,
null_count: usize,
null_buffer: Option<Buffer>,
nulls: Option<NullBuffer>,
) -> PrimitiveArray<O> {
PrimitiveArray::from(ArrayData::new_unchecked(
O::DATA_TYPE,
len,
Some(null_count),
null_buffer,
0,
vec![buffer],
vec![],
))
PrimitiveArray::from(
ArrayDataBuilder::new(O::DATA_TYPE)
.len(len)
.nulls(nulls)
.buffers(vec![buffer])
.build_unchecked(),
)
}

/// See [`PrimitiveArray::unary`]
Expand Down Expand Up @@ -220,11 +215,7 @@ where
return Ok(PrimitiveArray::from(ArrayData::new_empty(&O::DATA_TYPE)));
}

let null_buffer = combine_option_bitmap(&[a.data(), b.data()], len);
let null_count = null_buffer
.as_ref()
.map(|x| len - x.count_set_bits_offset(0, len))
.unwrap_or_default();
let nulls = NullBuffer::union(a.nulls(), b.nulls());

let values = a.values().iter().zip(b.values()).map(|(l, r)| op(*l, *r));
// JUSTIFICATION
Expand All @@ -234,7 +225,7 @@ where
// `values` is an iterator with a known size from a PrimitiveArray
let buffer = unsafe { Buffer::from_trusted_len_iter(values) };

Ok(unsafe { build_primitive_array(len, buffer, null_count, null_buffer) })
Ok(unsafe { build_primitive_array(len, buffer, nulls) })
}

/// Given two arrays of length `len`, calls `op(a[i], b[i])` for `i` in `0..len`, mutating
Expand Down Expand Up @@ -275,10 +266,7 @@ where
))));
}

let len = a.len();

let null_buffer = combine_option_bitmap(&[a.data(), b.data()], len);
let nulls = null_buffer.map(|b| NullBuffer::new(BooleanBuffer::new(b, 0, len)));
let nulls = NullBuffer::union(a.nulls(), b.nulls());

let mut builder = a.into_builder()?;

Expand Down Expand Up @@ -326,28 +314,21 @@ where
if a.null_count() == 0 && b.null_count() == 0 {
try_binary_no_nulls(len, a, b, op)
} else {
let null_buffer = combine_option_bitmap(&[a.data(), b.data()], len);

let null_count = null_buffer
.as_ref()
.map(|x| len - x.count_set_bits_offset(0, len))
.unwrap_or_default();
let nulls = NullBuffer::union(a.nulls(), b.nulls()).unwrap();

let mut buffer = BufferBuilder::<O::Native>::new(len);
buffer.append_n_zeroed(len);
let slice = buffer.as_slice_mut();

try_for_each_valid_idx(len, 0, null_count, null_buffer.as_deref(), |idx| {
nulls.try_for_each_valid_idx(|idx| {
unsafe {
*slice.get_unchecked_mut(idx) =
op(a.value_unchecked(idx), b.value_unchecked(idx))?
};
Ok::<_, ArrowError>(())
})?;

Ok(unsafe {
build_primitive_array(len, buffer.finish(), null_count, null_buffer)
})
Ok(unsafe { build_primitive_array(len, buffer.finish(), Some(nulls)) })
}
}

Expand Down Expand Up @@ -391,17 +372,12 @@ where
if a.null_count() == 0 && b.null_count() == 0 {
try_binary_no_nulls_mut(len, a, b, op)
} else {
let null_buffer = combine_option_bitmap(&[a.data(), b.data()], len);
let null_count = null_buffer
.as_ref()
.map(|x| len - x.count_set_bits_offset(0, len))
.unwrap_or_default();

let nulls = NullBuffer::union(a.nulls(), b.nulls()).unwrap();
let mut builder = a.into_builder()?;

let slice = builder.values_slice_mut();

match try_for_each_valid_idx(len, 0, null_count, null_buffer.as_deref(), |idx| {
match nulls.try_for_each_valid_idx(|idx| {
unsafe {
*slice.get_unchecked_mut(idx) =
op(*slice.get_unchecked(idx), b.value_unchecked(idx))?
Expand All @@ -412,14 +388,8 @@ where
Err(err) => return Ok(Err(err)),
};

let array_builder = builder
.finish()
.into_data()
.into_builder()
.null_bit_buffer(null_buffer)
.null_count(null_count);

let array_data = unsafe { array_builder.build_unchecked() };
let array_builder = builder.finish().into_data().into_builder();
let array_data = unsafe { array_builder.nulls(Some(nulls)).build_unchecked() };
Ok(Ok(PrimitiveArray::<T>::from(array_data)))
}
}
Expand All @@ -442,7 +412,7 @@ where
buffer.push_unchecked(op(a.value_unchecked(idx), b.value_unchecked(idx))?);
};
}
Ok(unsafe { build_primitive_array(len, buffer.into(), 0, None) })
Ok(unsafe { build_primitive_array(len, buffer.into(), None) })
}

/// This intentional inline(never) attribute helps LLVM optimize the loop.
Expand Down
30 changes: 11 additions & 19 deletions arrow-array/src/array/boolean_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ use crate::array::print_long_array;
use crate::builder::BooleanBuilder;
use crate::iterator::BooleanIter;
use crate::{Array, ArrayAccessor, ArrayRef};
use arrow_buffer::buffer::NullBuffer;
use arrow_buffer::{bit_util, Buffer, MutableBuffer};
use arrow_data::bit_mask::combine_option_bitmap;
use arrow_data::ArrayData;
use arrow_buffer::{bit_util, Buffer, MutableBuffer, NullBuffer};
use arrow_data::{ArrayData, ArrayDataBuilder};
use arrow_schema::DataType;
use std::any::Any;
use std::sync::Arc;
Expand Down Expand Up @@ -189,7 +187,7 @@ impl BooleanArray {
where
F: FnMut(T::Item) -> bool,
{
let null_bit_buffer = left.data().nulls().map(|x| x.inner().sliced());
let null_bit_buffer = left.nulls().map(|x| x.inner().sliced());
let buffer = MutableBuffer::collect_bool(left.len(), |i| unsafe {
// SAFETY: i in range 0..len
op(left.value_unchecked(i))
Expand Down Expand Up @@ -235,24 +233,18 @@ impl BooleanArray {
{
assert_eq!(left.len(), right.len());

let null_bit_buffer =
combine_option_bitmap(&[left.data_ref(), right.data_ref()], left.len());

let nulls = NullBuffer::union(left.nulls(), right.nulls());
let buffer = MutableBuffer::collect_bool(left.len(), |i| unsafe {
// SAFETY: i in range 0..len
op(left.value_unchecked(i), right.value_unchecked(i))
});

let data = unsafe {
ArrayData::new_unchecked(
DataType::Boolean,
left.len(),
None,
null_bit_buffer,
0,
vec![Buffer::from(buffer)],
vec![],
)
ArrayDataBuilder::new(DataType::Boolean)
.len(left.len())
.nulls(nulls)
.buffers(vec![buffer.into()])
.build_unchecked()
};
Self::from(data)
}
Expand Down Expand Up @@ -470,7 +462,7 @@ mod tests {
assert_eq!(4, arr.len());
assert_eq!(0, arr.offset());
assert_eq!(0, arr.null_count());
assert!(arr.data().nulls().is_none());
assert!(arr.nulls().is_none());
for i in 0..3 {
assert!(!arr.is_null(i));
assert!(arr.is_valid(i));
Expand All @@ -485,7 +477,7 @@ mod tests {
assert_eq!(4, arr.len());
assert_eq!(0, arr.offset());
assert_eq!(2, arr.null_count());
assert!(arr.data().nulls().is_some());
assert!(arr.nulls().is_some());

assert!(arr.is_valid(0));
assert!(arr.is_null(1));
Expand Down
54 changes: 27 additions & 27 deletions arrow-array/src/array/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ use crate::timezone::Tz;
use crate::trusted_len::trusted_len_unzip;
use crate::types::*;
use crate::{Array, ArrayAccessor, ArrayRef};
use arrow_buffer::{i256, ArrowNativeType, Buffer, NullBuffer, ScalarBuffer};
use arrow_buffer::{
i256, ArrowNativeType, BooleanBuffer, Buffer, NullBuffer, ScalarBuffer,
};
use arrow_data::bit_iterator::try_for_each_valid_idx;
use arrow_data::ArrayData;
use arrow_data::{ArrayData, ArrayDataBuilder};
use arrow_schema::{ArrowError, DataType};
use chrono::{DateTime, Duration, NaiveDate, NaiveDateTime, NaiveTime};
use half::f16;
Expand Down Expand Up @@ -344,7 +346,7 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
pub fn from_value(value: T::Native, count: usize) -> Self {
unsafe {
let val_buf = Buffer::from_trusted_len_iter((0..count).map(|_| value));
build_primitive_array(count, val_buf, 0, None)
build_primitive_array(count, val_buf, None)
}
}

Expand Down Expand Up @@ -421,17 +423,16 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
{
let data = self.data();
let len = self.len();
let null_count = self.null_count();

let null_buffer = data.nulls().map(|b| b.inner().sliced());
let nulls = data.nulls().cloned();
let values = self.values().iter().map(|v| op(*v));
// JUSTIFICATION
// Benefit
// ~60% speedup
// Soundness
// `values` is an iterator with a known size because arrays are sized.
let buffer = unsafe { Buffer::from_trusted_len_iter(values) };
unsafe { build_primitive_array(len, buffer, null_count, null_buffer) }
unsafe { build_primitive_array(len, buffer, nulls) }
}

/// Applies an unary and infallible function to a mutable primitive array.
Expand Down Expand Up @@ -478,21 +479,23 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
{
let data = self.data();
let len = self.len();
let null_count = self.null_count();

let null_buffer = data.nulls().map(|b| b.inner().sliced());
let nulls = data.nulls().cloned();
let mut buffer = BufferBuilder::<O::Native>::new(len);
buffer.append_n_zeroed(len);
let slice = buffer.as_slice_mut();

try_for_each_valid_idx(len, 0, null_count, null_buffer.as_deref(), |idx| {
let f = |idx| {
unsafe { *slice.get_unchecked_mut(idx) = op(self.value_unchecked(idx))? };
Ok::<_, E>(())
})?;
};

Ok(unsafe {
build_primitive_array(len, buffer.finish(), null_count, null_buffer)
})
match &nulls {
Some(nulls) => nulls.try_for_each_valid_idx(f)?,
None => (0..len).try_for_each(f)?,
}

Ok(unsafe { build_primitive_array(len, buffer.finish(), nulls) })
}

/// Applies an unary and fallible function to all valid values in a mutable primitive array.
Expand Down Expand Up @@ -575,12 +578,12 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
Ok::<_, ()>(())
});

let nulls = BooleanBuffer::new(null_builder.finish(), 0, len);
unsafe {
build_primitive_array(
len,
buffer.finish(),
out_null_count,
Some(null_builder.finish()),
Some(NullBuffer::new_unchecked(nulls, out_null_count)),
)
}
}
Expand Down Expand Up @@ -648,18 +651,15 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
unsafe fn build_primitive_array<O: ArrowPrimitiveType>(
len: usize,
buffer: Buffer,
null_count: usize,
null_buffer: Option<Buffer>,
nulls: Option<NullBuffer>,
) -> PrimitiveArray<O> {
PrimitiveArray::from(ArrayData::new_unchecked(
O::DATA_TYPE,
len,
Some(null_count),
null_buffer,
0,
vec![buffer],
vec![],
))
PrimitiveArray::from(
ArrayDataBuilder::new(O::DATA_TYPE)
.len(len)
.buffers(vec![buffer])
.nulls(nulls)
.build_unchecked(),
)
}

impl<T: ArrowPrimitiveType> From<PrimitiveArray<T>> for ArrayData {
Expand Down Expand Up @@ -1782,7 +1782,7 @@ mod tests {
let primitive_array = PrimitiveArray::<Int32Type>::from_iter(iter);
assert_eq!(primitive_array.len(), 10);
assert_eq!(primitive_array.null_count(), 0);
assert!(primitive_array.data().nulls().is_none());
assert!(primitive_array.nulls().is_none());
assert_eq!(primitive_array.values(), &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
}

Expand Down
4 changes: 2 additions & 2 deletions arrow-array/src/builder/generic_bytes_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ mod tests {
builder.append_value("parquet");
let arr = builder.finish();
// array should not have null buffer because there is not `null` value.
assert!(arr.data().nulls().is_none());
assert!(arr.nulls().is_none());
assert_eq!(GenericStringArray::<O>::from(vec!["arrow", "parquet"]), arr,)
}

Expand Down Expand Up @@ -454,7 +454,7 @@ mod tests {
builder.append_value("parquet");
arr = builder.finish();

assert!(arr.data().nulls().is_some());
assert!(arr.nulls().is_some());
assert_eq!(&[O::zero()], builder.offsets_slice());
assert_eq!(5, arr.len());
}
Expand Down
Loading

0 comments on commit f59c889

Please sign in to comment.