Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Array::logical_nulls to only copy when necessary #5209

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions arrow-arith/src/arity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,10 @@ where
return Ok(PrimitiveArray::from(ArrayData::new_empty(&O::DATA_TYPE)));
}

let nulls = NullBuffer::union(a.logical_nulls().as_ref(), b.logical_nulls().as_ref());
let nulls = NullBuffer::union(
a.logical_nulls().as_ref().map(|n| n.as_ref()),
b.logical_nulls().as_ref().map(|n| n.as_ref()),
);

let values = a.values().iter().zip(b.values()).map(|(l, r)| op(*l, *r));
// JUSTIFICATION
Expand Down Expand Up @@ -244,7 +247,10 @@ where
))));
}

let nulls = NullBuffer::union(a.logical_nulls().as_ref(), b.logical_nulls().as_ref());
let nulls = NullBuffer::union(
a.logical_nulls().as_ref().map(|n| n.as_ref()),
b.logical_nulls().as_ref().map(|n| n.as_ref()),
);

let mut builder = a.into_builder()?;

Expand Down Expand Up @@ -292,8 +298,11 @@ where
if a.null_count() == 0 && b.null_count() == 0 {
try_binary_no_nulls(len, a, b, op)
} else {
let nulls =
NullBuffer::union(a.logical_nulls().as_ref(), b.logical_nulls().as_ref()).unwrap();
let nulls = NullBuffer::union(
a.logical_nulls().as_ref().map(|n| n.as_ref()),
b.logical_nulls().as_ref().map(|n| n.as_ref()),
)
.unwrap();

let mut buffer = BufferBuilder::<O::Native>::new(len);
buffer.append_n_zeroed(len);
Expand Down Expand Up @@ -351,8 +360,11 @@ where
if a.null_count() == 0 && b.null_count() == 0 {
try_binary_no_nulls_mut(len, a, b, op)
} else {
let nulls =
NullBuffer::union(a.logical_nulls().as_ref(), b.logical_nulls().as_ref()).unwrap();
let nulls = NullBuffer::union(
a.logical_nulls().as_ref().map(|n| n.as_ref()),
b.logical_nulls().as_ref().map(|n| n.as_ref()),
)
.unwrap();

let mut builder = a.into_builder()?;

Expand Down
6 changes: 3 additions & 3 deletions arrow-array/src/array/boolean_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl BooleanArray {
where
F: FnMut(T::Item) -> bool,
{
let nulls = left.logical_nulls();
let nulls = left.logical_nulls().map(|n| n.into_owned());
let values = BooleanBuffer::collect_bool(left.len(), |i| unsafe {
// SAFETY: i in range 0..len
op(left.value_unchecked(i))
Expand Down Expand Up @@ -245,8 +245,8 @@ impl BooleanArray {
assert_eq!(left.len(), right.len());

let nulls = NullBuffer::union(
left.logical_nulls().as_ref(),
right.logical_nulls().as_ref(),
left.logical_nulls().as_ref().map(|n| n.as_ref()),
right.logical_nulls().as_ref().map(|n| n.as_ref()),
);
let values = BooleanBuffer::collect_bool(left.len(), |i| unsafe {
// SAFETY: i in range 0..len
Expand Down
10 changes: 6 additions & 4 deletions arrow-array/src/array/dictionary_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use arrow_buffer::{ArrowNativeType, BooleanBuffer, BooleanBufferBuilder};
use arrow_data::ArrayData;
use arrow_schema::{ArrowError, DataType};
use std::any::Any;
use std::borrow::Cow;
use std::sync::Arc;

/// A [`DictionaryArray`] indexed by `i8`
Expand Down Expand Up @@ -720,8 +721,8 @@ impl<T: ArrowDictionaryKeyType> Array for DictionaryArray<T> {
self.keys.nulls()
}

fn logical_nulls(&self) -> Option<NullBuffer> {
match self.values.nulls() {
fn logical_nulls(&self) -> Option<Cow<'_, NullBuffer>> {
let logical_nulls = match self.values.nulls() {
None => self.nulls().cloned(),
Some(value_nulls) => {
let mut builder = BooleanBufferBuilder::new(self.len());
Expand All @@ -738,7 +739,8 @@ impl<T: ArrowDictionaryKeyType> Array for DictionaryArray<T> {
}
Some(builder.finish().into())
}
}
};
logical_nulls.map(Cow::Owned)
}

fn is_nullable(&self) -> bool {
Expand Down Expand Up @@ -854,7 +856,7 @@ impl<'a, K: ArrowDictionaryKeyType, V: Sync> Array for TypedDictionaryArray<'a,
self.dictionary.nulls()
}

fn logical_nulls(&self) -> Option<NullBuffer> {
fn logical_nulls(&self) -> Option<Cow<'_, NullBuffer>> {
self.dictionary.logical_nulls()
}

Expand Down
87 changes: 49 additions & 38 deletions arrow-array/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,55 +17,45 @@

//! The concrete array definitions

mod binary_array;
use std::any::Any;
use std::borrow::Cow;
use std::sync::Arc;

use crate::types::*;
use arrow_buffer::{ArrowNativeType, NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow_data::ArrayData;
use arrow_schema::{DataType, IntervalUnit, TimeUnit};
use std::any::Any;
use std::sync::Arc;

pub use binary_array::*;

mod boolean_array;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of this re-organisation, it makes it harder to notice if you've missed a re-export

Copy link
Contributor Author

@alamb alamb Dec 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't do it on purpose -- my editor must have done it. I will revert it if we proceed with this PR

pub use boolean_array::*;

mod byte_array;
pub use byte_array::*;

mod dictionary_array;
pub use dictionary_array::*;

mod fixed_size_binary_array;
pub use fixed_size_binary_array::*;

mod fixed_size_list_array;
pub use fixed_size_list_array::*;

mod list_array;
pub use list_array::*;

mod map_array;
pub use map_array::*;

mod null_array;
pub use null_array::*;

mod primitive_array;
pub use primitive_array::*;

mod string_array;
pub use run_array::*;
pub use string_array::*;

mod struct_array;
pub use struct_array::*;

mod union_array;
pub use union_array::*;

use crate::types::*;

mod binary_array;

mod boolean_array;
mod byte_array;
mod dictionary_array;
mod fixed_size_binary_array;
mod fixed_size_list_array;
mod list_array;
mod map_array;
mod null_array;
mod primitive_array;
mod run_array;
pub use run_array::*;
mod string_array;
mod struct_array;
mod union_array;

/// An array in the [arrow columnar format](https://arrow.apache.org/docs/format/Columnar.html)
pub trait Array: std::fmt::Debug + Send + Sync {
Expand Down Expand Up @@ -182,7 +172,8 @@ pub trait Array: std::fmt::Debug + Send + Sync {
/// you can use the slower [`Array::logical_nulls`] to obtain a computed mask .
fn nulls(&self) -> Option<&NullBuffer>;

/// Returns a potentially computed [`NullBuffer`] that represent the logical null values of this array, if any.
/// Returns a potentially computed [`NullBuffer`] that represent the logical
/// null values of this array, if any.
///
/// In most cases this will be the same as [`Array::nulls`], except for:
///
Expand All @@ -192,8 +183,26 @@ pub trait Array: std::fmt::Debug + Send + Sync {
///
/// In these cases a logical [`NullBuffer`] will be computed, encoding the logical nullability
/// of these arrays, beyond what is encoded in [`Array::nulls`]
fn logical_nulls(&self) -> Option<NullBuffer> {
self.nulls().cloned()
///
/// Returns a [`Cow<'_, NullBuffer>`] to avoid a copy of the null buffer
/// when it is already present.
///
/// # Example:
/// ```
/// # use arrow_array::{Array, Int32Array};
/// # use arrow_buffer::NullBuffer;
/// # let array: Int32Array = [Some(1), None, Some(3)].into_iter().collect();
///
/// // to get a Option<&NullBuffer> requires using `as_ref`:
/// let logical_nulls = array.logical_nulls();
/// let logical_nulls_ref: Option<&NullBuffer> = logical_nulls.as_ref().map(|n| n.as_ref());
///
/// // use into_owned to get an owned `NullBuffer`
/// let logical_nulls = array.logical_nulls();
/// let logical_nulls_owned: Option<NullBuffer> = logical_nulls.map(|n| n.into_owned());
/// ```
fn logical_nulls(&self) -> Option<Cow<'_, NullBuffer>> {
self.nulls().map(Cow::Borrowed)
}

/// Returns whether the element at `index` is null according to [`Array::nulls`]
Expand Down Expand Up @@ -321,7 +330,7 @@ impl Array for ArrayRef {
self.as_ref().nulls()
}

fn logical_nulls(&self) -> Option<NullBuffer> {
fn logical_nulls(&self) -> Option<Cow<'_, NullBuffer>> {
self.as_ref().logical_nulls()
}

Expand Down Expand Up @@ -387,7 +396,7 @@ impl<'a, T: Array> Array for &'a T {
T::nulls(self)
}

fn logical_nulls(&self) -> Option<NullBuffer> {
fn logical_nulls(&self) -> Option<Cow<'_, NullBuffer>> {
T::logical_nulls(self)
}

Expand Down Expand Up @@ -705,12 +714,14 @@ where

#[cfg(test)]
mod tests {
use super::*;
use crate::cast::{as_union_array, downcast_array};
use crate::downcast_run_array;
use arrow_buffer::MutableBuffer;
use arrow_schema::{Field, Fields, UnionFields, UnionMode};

use crate::cast::{as_union_array, downcast_array};
use crate::downcast_run_array;

use super::*;

#[test]
fn test_empty_primitive() {
let array = new_empty_array(&DataType::Int32);
Expand Down
5 changes: 3 additions & 2 deletions arrow-array/src/array/null_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use arrow_buffer::buffer::NullBuffer;
use arrow_data::{ArrayData, ArrayDataBuilder};
use arrow_schema::DataType;
use std::any::Any;
use std::borrow::Cow;
use std::sync::Arc;

/// An array of [null values](https://arrow.apache.org/docs/format/Columnar.html#null-layout)
Expand Down Expand Up @@ -109,8 +110,8 @@ impl Array for NullArray {
None
}

fn logical_nulls(&self) -> Option<NullBuffer> {
(self.len != 0).then(|| NullBuffer::new_null(self.len))
fn logical_nulls(&self) -> Option<Cow<'_, NullBuffer>> {
(self.len != 0).then(|| Cow::Owned(NullBuffer::new_null(self.len)))
}

fn is_nullable(&self) -> bool {
Expand Down
10 changes: 6 additions & 4 deletions arrow-array/src/array/run_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use std::any::Any;
use std::borrow::Cow;
use std::sync::Arc;

use arrow_buffer::{ArrowNativeType, BooleanBufferBuilder, NullBuffer, RunEndBuffer};
Expand Down Expand Up @@ -339,7 +340,7 @@ impl<T: RunEndIndexType> Array for RunArray<T> {
None
}

fn logical_nulls(&self) -> Option<NullBuffer> {
fn logical_nulls(&self) -> Option<Cow<'_, NullBuffer>> {
let len = self.len();
let nulls = self.values.logical_nulls()?;
let mut out = BooleanBufferBuilder::new(len);
Expand Down Expand Up @@ -369,7 +370,7 @@ impl<T: RunEndIndexType> Array for RunArray<T> {
}
// Sanity check
assert_eq!(out.len(), len);
Some(out.finish().into())
Some(Cow::Owned(out.finish().into()))
}

fn is_nullable(&self) -> bool {
Expand Down Expand Up @@ -593,7 +594,7 @@ impl<'a, R: RunEndIndexType, V: Sync> Array for TypedRunArray<'a, R, V> {
self.run_array.nulls()
}

fn logical_nulls(&self) -> Option<NullBuffer> {
fn logical_nulls(&self) -> Option<Cow<'_, NullBuffer>> {
self.run_array.logical_nulls()
}

Expand Down Expand Up @@ -660,12 +661,13 @@ mod tests {
use rand::thread_rng;
use rand::Rng;

use super::*;
use crate::builder::PrimitiveRunBuilder;
use crate::cast::AsArray;
use crate::types::{Int16Type, Int32Type, Int8Type, UInt32Type};
use crate::{Array, Int32Array, StringArray};

use super::*;

fn build_input_array(size: usize) -> Vec<Option<i32>> {
// The input array is created by shuffling and repeating
// the seed values random number of times.
Expand Down
5 changes: 3 additions & 2 deletions arrow-array/src/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

//! Idiomatic iterators for [`Array`](crate::Array)

use arrow_buffer::NullBuffer;

use crate::array::{
ArrayAccessor, BooleanArray, FixedSizeBinaryArray, GenericBinaryArray, GenericListArray,
GenericStringArray, PrimitiveArray,
};
use crate::{FixedSizeListArray, MapArray};
use arrow_buffer::NullBuffer;

/// An iterator that returns Some(T) or None, that can be used on any [`ArrayAccessor`]
///
Expand Down Expand Up @@ -56,7 +57,7 @@ impl<T: ArrayAccessor> ArrayIter<T> {
/// create a new iterator
pub fn new(array: T) -> Self {
let len = array.len();
let logical_nulls = array.logical_nulls();
let logical_nulls = array.logical_nulls().map(|n| n.into_owned());
ArrayIter {
array,
logical_nulls,
Expand Down
6 changes: 3 additions & 3 deletions arrow-ord/src/cmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ fn compare_op(op: Op, lhs: &dyn Datum, rhs: &dyn Datum) -> Result<BooleanArray,
(Some(_), true, Some(a), false) | (Some(a), false, Some(_), true) => {
// Scalar is null, other side is non-scalar and nullable
match op {
Op::Distinct => a.into_inner().into(),
Op::NotDistinct => a.into_inner().not().into(),
Op::Distinct => a.into_owned().into_inner().into(),
Op::NotDistinct => a.into_owned().into_inner().not().into(),
_ => BooleanArray::new_null(len),
}
}
Expand All @@ -276,7 +276,7 @@ fn compare_op(op: Op, lhs: &dyn Datum, rhs: &dyn Datum) -> Result<BooleanArray,
BooleanBuffer::new(buffer, 0, len).into()
}
Op::NotDistinct => (nulls.inner() & &values()).into(),
_ => BooleanArray::new(values(), Some(nulls)),
_ => BooleanArray::new(values(), Some(nulls.into_owned())),
},
}
}
Expand Down
2 changes: 1 addition & 1 deletion arrow-ord/src/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ impl LexicographicalComparator {
// flatten and convert build comparators
let values = column.values.as_ref();
Ok((
values.logical_nulls(),
values.logical_nulls().map(|n| n.into_owned()),
build_compare(values, values)?,
column.options.unwrap_or_default(),
))
Expand Down
2 changes: 1 addition & 1 deletion arrow-select/src/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ pub fn merge_dictionary_values<K: ArrowDictionaryKeyType>(
for (idx, dictionary) in dictionaries.iter().enumerate() {
let mask = masks.and_then(|m| m.get(idx));
let key_mask = match (dictionary.logical_nulls(), mask) {
(Some(n), None) => Some(n.into_inner()),
(Some(n), None) => Some(n.into_owned().into_inner()),
(None, Some(n)) => Some(n.clone()),
(Some(n), Some(m)) => Some(n.inner() & m),
(None, None) => None,
Expand Down
Loading