Skip to content

Commit

Permalink
feat: Add fill_null compute function (#1590)
Browse files Browse the repository at this point in the history
Co-authored-by: Will Manning <[email protected]>
  • Loading branch information
robert3005 and lwwmanning authored Dec 6, 2024
1 parent bb83f9b commit c774557
Show file tree
Hide file tree
Showing 13 changed files with 216 additions and 75 deletions.
12 changes: 5 additions & 7 deletions encodings/dict/src/compute/compare.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use vortex_array::array::ConstantArray;
use vortex_array::compute::{compare, CompareFn, Operator};
use vortex_array::{ArrayData, IntoArrayData};
use vortex_array::compute::{compare, take, CompareFn, Operator, TakeOptions};
use vortex_array::ArrayData;
use vortex_error::VortexResult;

use crate::{DictArray, DictEncoding};
Expand All @@ -15,14 +15,12 @@ impl CompareFn<DictArray> for DictEncoding {
// If the RHS is constant, then we just need to compare against our encoded values.
if let Some(const_scalar) = rhs.as_constant() {
// Ensure the other is the same length as the dictionary
return compare(
let compare_result = compare(
lhs.values(),
ConstantArray::new(const_scalar, lhs.values().len()),
operator,
)
.and_then(|values| DictArray::try_new(lhs.codes(), values))
.map(|a| a.into_array())
.map(Some);
)?;
return take(compare_result, lhs.codes(), TakeOptions::default()).map(Some);
}

// It's a little more complex, but we could perform a comparison against the dictionary
Expand Down
File renamed without changes.
65 changes: 65 additions & 0 deletions vortex-array/src/array/bool/compute/fill_null.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use vortex_error::{vortex_err, VortexResult};
use vortex_scalar::Scalar;

use crate::array::{BoolArray, BoolEncoding, ConstantArray};
use crate::compute::FillNullFn;
use crate::validity::Validity;
use crate::{ArrayData, ArrayLen, IntoArrayData, IntoArrayVariant};

impl FillNullFn<BoolArray> for BoolEncoding {
fn fill_null(&self, array: &BoolArray, fill_value: Scalar) -> VortexResult<ArrayData> {
let fill = fill_value
.as_bool()
.value()
.ok_or_else(|| vortex_err!("Fill value must be non null"))?;

Ok(match array.validity() {
Validity::NonNullable => array.clone().into_array(),
Validity::AllValid => BoolArray::from(array.boolean_buffer()).into_array(),
Validity::AllInvalid => ConstantArray::new(fill, array.len()).into_array(),
Validity::Array(v) => {
let bool_buffer = if fill {
&array.boolean_buffer() | &!&v.into_bool()?.boolean_buffer()
} else {
&array.boolean_buffer() & &v.into_bool()?.boolean_buffer()
};
BoolArray::from(bool_buffer).into_array()
}
})
}
}

#[cfg(test)]
mod tests {
use arrow_buffer::BooleanBuffer;
use rstest::rstest;
use vortex_dtype::{DType, Nullability};

use crate::array::BoolArray;
use crate::compute::fill_null;
use crate::validity::Validity;
use crate::{ArrayDType, IntoArrayVariant};

#[rstest]
#[case(true, vec![true, true, false, true])]
#[case(false, vec![true, false, false, false])]
fn bool_fill_null(#[case] fill_value: bool, #[case] expected: Vec<bool>) {
let bool_array = BoolArray::try_new(
BooleanBuffer::from_iter([true, true, false, false]),
Validity::from_iter([true, false, true, false]),
)
.unwrap();
let non_null_array = fill_null(bool_array, fill_value.into())
.unwrap()
.into_bool()
.unwrap();
assert_eq!(
non_null_array.boolean_buffer().iter().collect::<Vec<_>>(),
expected
);
assert_eq!(
non_null_array.dtype(),
&DType::Bool(Nullability::NonNullable)
);
}
}
10 changes: 8 additions & 2 deletions vortex-array/src/array/bool/compute/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use crate::array::BoolEncoding;
use crate::compute::{
BinaryBooleanFn, ComputeVTable, FillForwardFn, FilterFn, InvertFn, ScalarAtFn, SliceFn, TakeFn,
BinaryBooleanFn, ComputeVTable, FillForwardFn, FillNullFn, FilterFn, InvertFn, ScalarAtFn,
SliceFn, TakeFn,
};
use crate::ArrayData;

mod fill;
mod fill_forward;
mod fill_null;
pub mod filter;
mod flatten;
mod invert;
Expand Down Expand Up @@ -44,4 +46,8 @@ impl ComputeVTable for BoolEncoding {
fn take_fn(&self) -> Option<&dyn TakeFn<ArrayData>> {
Some(self)
}

fn fill_null_fn(&self) -> Option<&dyn FillNullFn<ArrayData>> {
Some(self)
}
}
47 changes: 47 additions & 0 deletions vortex-array/src/array/chunked/compute/fill_null.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use vortex_error::VortexResult;
use vortex_scalar::Scalar;

use crate::array::{ChunkedArray, ChunkedEncoding};
use crate::compute::{fill_null, FillNullFn};
use crate::{ArrayDType, ArrayData, IntoArrayData};

impl FillNullFn<ChunkedArray> for ChunkedEncoding {
fn fill_null(&self, array: &ChunkedArray, fill_value: Scalar) -> VortexResult<ArrayData> {
ChunkedArray::try_new(
array
.chunks()
.map(|c| fill_null(c, fill_value.clone()))
.collect::<VortexResult<Vec<_>>>()?,
array.dtype().as_nonnullable(),
)
.map(|a| a.into_array())
}
}

#[cfg(test)]
mod tests {
use arrow_buffer::BooleanBuffer;
use vortex_dtype::{DType, Nullability};

use crate::array::{BoolArray, ChunkedArray};
use crate::compute::fill_null;
use crate::validity::Validity;
use crate::{ArrayDType, IntoArrayData};

#[test]
fn fill_null_chunks() {
let chunked = ChunkedArray::try_new(
vec![
BoolArray::try_new(BooleanBuffer::new_set(5), Validity::AllInvalid)
.unwrap()
.into_array(),
BoolArray::new(BooleanBuffer::new_set(5), Nullability::Nullable).into_array(),
],
DType::Bool(Nullability::Nullable),
)
.unwrap();

let filled = fill_null(chunked, false.into()).unwrap();
assert_eq!(*filled.dtype(), DType::Bool(Nullability::NonNullable));
}
}
9 changes: 7 additions & 2 deletions vortex-array/src/array/chunked/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ use vortex_error::VortexResult;
use crate::array::chunked::ChunkedArray;
use crate::array::ChunkedEncoding;
use crate::compute::{
try_cast, BinaryBooleanFn, CastFn, CompareFn, ComputeVTable, FilterFn, InvertFn, ScalarAtFn,
SliceFn, SubtractScalarFn, TakeFn,
try_cast, BinaryBooleanFn, CastFn, CompareFn, ComputeVTable, FillNullFn, FilterFn, InvertFn,
ScalarAtFn, SliceFn, SubtractScalarFn, TakeFn,
};
use crate::{ArrayData, IntoArrayData};

mod boolean;
mod compare;
mod fill_null;
mod filter;
mod invert;
mod scalar_at;
Expand Down Expand Up @@ -53,6 +54,10 @@ impl ComputeVTable for ChunkedEncoding {
fn take_fn(&self) -> Option<&dyn TakeFn<ArrayData>> {
Some(self)
}

fn fill_null_fn(&self) -> Option<&dyn FillNullFn<ArrayData>> {
Some(self)
}
}

impl CastFn<ChunkedArray> for ChunkedEncoding {
Expand Down
49 changes: 49 additions & 0 deletions vortex-array/src/compute/fill_null.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use vortex_error::{vortex_bail, vortex_err, VortexError, VortexResult};
use vortex_scalar::Scalar;

use crate::encoding::Encoding;
use crate::{ArrayDType, ArrayData};

/// Implementation of fill_null for an encoding.
///
/// SAFETY: the fill value is guaranteed to be non-null.
pub trait FillNullFn<Array> {
fn fill_null(&self, array: &Array, fill_value: Scalar) -> VortexResult<ArrayData>;
}

impl<E: Encoding> FillNullFn<ArrayData> for E
where
E: FillNullFn<E::Array>,
for<'a> &'a E::Array: TryFrom<&'a ArrayData, Error = VortexError>,
{
fn fill_null(&self, array: &ArrayData, fill_value: Scalar) -> VortexResult<ArrayData> {
let array_ref = <&E::Array>::try_from(array)?;
let encoding = array
.encoding()
.as_any()
.downcast_ref::<E>()
.ok_or_else(|| vortex_err!("Mismatched encoding"))?;
FillNullFn::fill_null(encoding, array_ref, fill_value)
}
}

pub fn fill_null(array: impl AsRef<ArrayData>, fill_value: Scalar) -> VortexResult<ArrayData> {
let array = array.as_ref();
if !array.dtype().is_nullable() {
return Ok(array.clone());
}

if fill_value.is_null() {
vortex_bail!("Cannot fill_null with a null value")
}

if !array.dtype().eq_ignore_nullability(fill_value.dtype()) {
vortex_bail!(MismatchedTypes: array.dtype(), fill_value.dtype())
}

array
.encoding()
.fill_null_fn()
.map(|f| f.fill_null(array, fill_value))
.unwrap_or_else(|| Err(vortex_err!(NotImplemented: "fill_null", array.encoding().id())))
}
15 changes: 12 additions & 3 deletions vortex-array/src/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,23 @@ pub use boolean::{
pub use cast::{try_cast, CastFn};
pub use compare::{compare, scalar_cmp, CompareFn, Operator};
pub use fill_forward::{fill_forward, FillForwardFn};
pub use filter::*;
pub use fill_null::{fill_null, FillNullFn};
pub use filter::{filter, FilterFn, FilterIter, FilterMask};
pub use invert::{invert, InvertFn};
pub use like::*;
pub use like::{like, LikeFn, LikeOptions};
pub use scalar_at::{scalar_at, ScalarAtFn};
pub use scalar_subtract::{subtract_scalar, SubtractScalarFn};
pub use search_sorted::*;
pub use slice::{slice, SliceFn};
pub use take::*;
pub use take::{take, TakeFn, TakeOptions};

use crate::ArrayData;

mod boolean;
mod cast;
mod compare;
mod fill_forward;
mod fill_null;
mod filter;
mod invert;
mod like;
Expand Down Expand Up @@ -130,4 +132,11 @@ pub trait ComputeVTable {
fn take_fn(&self) -> Option<&dyn TakeFn<ArrayData>> {
None
}

/// Fill null values with given desired value. Resulting array is NonNullable
///
/// See: [FillNullFn]
fn fill_null_fn(&self) -> Option<&dyn FillNullFn<ArrayData>> {
None
}
}
1 change: 1 addition & 0 deletions vortex-file/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ vortex-scalar = { workspace = true, features = ["flatbuffers"] }
arrow-schema = { workspace = true }
rstest = { workspace = true }
tokio = { workspace = true, features = ["full"] }
vortex-io = { path = "../vortex-io", features = ["tokio"] }

[lints]
workspace = true
Expand Down
47 changes: 5 additions & 42 deletions vortex-file/src/read/filtering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ use std::any::Any;
use std::fmt::{Debug, Display};
use std::sync::Arc;

use arrow_buffer::BooleanBuffer;
use itertools::Itertools;
use vortex_array::aliases::hash_set::HashSet;
use vortex_array::array::{BoolArray, ConstantArray};
use vortex_array::compute::and_kleene;
use vortex_array::array::ConstantArray;
use vortex_array::compute::{and_kleene, fill_null};
use vortex_array::stats::ArrayStatistics;
use vortex_array::validity::Validity;
use vortex_array::{ArrayData, ArrayLen, IntoArrayData, IntoArrayVariant};
use vortex_array::{ArrayData, IntoArrayData};
use vortex_dtype::field::Field;
use vortex_error::{VortexExpect, VortexResult};
use vortex_expr::{split_conjunction, unbox_any, ExprRef, VortexExpr};
Expand Down Expand Up @@ -85,11 +83,11 @@ impl VortexExpr for RowFilter {

let new_mask = expr.evaluate(batch)?;
// Either `and` or `and_kleene` is fine. They only differ on `false AND null`, but
// null_as_false only cares which values are true.
// fill_null only cares which values are true.
mask = and_kleene(new_mask, mask)?;
}

null_as_false(mask.into_bool()?)
fill_null(mask, false.into())
}

fn collect_references<'a>(&'a self, references: &mut HashSet<&'a Field>) {
Expand Down Expand Up @@ -119,38 +117,3 @@ impl PartialEq<dyn Any> for RowFilter {
.unwrap_or(false)
}
}

pub fn null_as_false(array: BoolArray) -> VortexResult<ArrayData> {
Ok(match array.validity() {
Validity::NonNullable => array.into_array(),
Validity::AllValid => BoolArray::from(array.boolean_buffer()).into_array(),
Validity::AllInvalid => BoolArray::from(BooleanBuffer::new_unset(array.len())).into_array(),
Validity::Array(v) => {
let bool_buffer = &array.boolean_buffer() & &v.into_bool()?.boolean_buffer();
BoolArray::from(bool_buffer).into_array()
}
})
}

#[cfg(test)]
mod tests {
use vortex_array::array::BoolArray;
use vortex_array::validity::Validity;
use vortex_array::IntoArrayVariant;

use super::*;

#[test]
fn coerces_nulls() {
let bool_array = BoolArray::try_new(
BooleanBuffer::from_iter([true, true, false, false]),
Validity::from_iter([true, false, true, false]),
)
.unwrap();
let non_null_array = null_as_false(bool_array).unwrap().into_bool().unwrap();
assert_eq!(
non_null_array.boolean_buffer().iter().collect::<Vec<_>>(),
vec![true, false, false, false]
);
}
}
8 changes: 5 additions & 3 deletions vortex-file/src/read/layouts/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,11 @@ impl LayoutReader for ColumnarLayoutReader {
}
BatchRead::Value(arr) => {
if self.shortcircuit_siblings
&& arr.statistics().compute_true_count().vortex_expect(
"must be a bool array if shortcircuit_siblings is set to true",
) == 0
&& arr
.statistics()
.compute_true_count()
.map(|true_count| true_count == 0)
.unwrap_or(false)
{
in_progress_guard.remove(&selection_range);
return Ok(None);
Expand Down
Loading

0 comments on commit c774557

Please sign in to comment.