Skip to content

Commit

Permalink
Add Scalar/Datum abstraction (apache#1047)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Jun 9, 2023
1 parent 2846cde commit c577e76
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 41 deletions.
11 changes: 11 additions & 0 deletions arrow-array/src/array/boolean_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ impl BooleanArray {
Self { values, nulls }
}

/// Create a new [`BooleanArray`] with length `len` consisting only of nulls
pub fn new_null(len: usize) -> Self {
let buffer = MutableBuffer::from_len_zeroed(bit_util::ceil(len, 8));
let values = BooleanBuffer::new(buffer.into(), 0, len);
let nulls = NullBuffer::new_null(len);
Self {
values,
nulls: Some(nulls),
}
}

/// Returns the length of this array.
pub fn len(&self) -> usize {
self.values.len()
Expand Down
56 changes: 28 additions & 28 deletions arrow-array/src/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ macro_rules! repeat_pat {
/// [`DataType`]: arrow_schema::DataType
#[macro_export]
macro_rules! downcast_integer {
($($data_type:expr),+ => ($m:path $(, $args:tt)*), $($($p:pat),+ => $fallback:expr $(,)*)*) => {
($($data_type:expr),+ => ($m:path $(, $args:tt)*), $($p:pat => $fallback:expr $(,)*)*) => {
match ($($data_type),+) {
$crate::repeat_pat!(arrow_schema::DataType::Int8, $($data_type),+) => {
$m!($crate::types::Int8Type $(, $args)*)
Expand All @@ -90,7 +90,7 @@ macro_rules! downcast_integer {
$crate::repeat_pat!(arrow_schema::DataType::UInt64, $($data_type),+) => {
$m!($crate::types::UInt64Type $(, $args)*)
}
$(($($p),+) => $fallback,)*
$($p => $fallback,)*
}
};
}
Expand Down Expand Up @@ -127,7 +127,7 @@ macro_rules! downcast_integer {
/// [`DataType`]: arrow_schema::DataType
#[macro_export]
macro_rules! downcast_run_end_index {
($($data_type:expr),+ => ($m:path $(, $args:tt)*), $($($p:pat),+ => $fallback:expr $(,)*)*) => {
($($data_type:expr),+ => ($m:path $(, $args:tt)*), $($p:pat => $fallback:expr $(,)*)*) => {
match ($($data_type),+) {
$crate::repeat_pat!(arrow_schema::DataType::Int16, $($data_type),+) => {
$m!($crate::types::Int16Type $(, $args)*)
Expand All @@ -138,7 +138,7 @@ macro_rules! downcast_run_end_index {
$crate::repeat_pat!(arrow_schema::DataType::Int64, $($data_type),+) => {
$m!($crate::types::Int64Type $(, $args)*)
}
$(($($p),+) => $fallback,)*
$($p => $fallback,)*
}
};
}
Expand Down Expand Up @@ -170,7 +170,7 @@ macro_rules! downcast_run_end_index {
/// [`DataType`]: arrow_schema::DataType
#[macro_export]
macro_rules! downcast_temporal {
($($data_type:expr),+ => ($m:path $(, $args:tt)*), $($($p:pat),+ => $fallback:expr $(,)*)*) => {
($($data_type:expr),+ => ($m:path $(, $args:tt)*), $($p:pat => $fallback:expr $(,)*)*) => {
match ($($data_type),+) {
$crate::repeat_pat!(arrow_schema::DataType::Time32(arrow_schema::TimeUnit::Second), $($data_type),+) => {
$m!($crate::types::Time32SecondType $(, $args)*)
Expand Down Expand Up @@ -202,7 +202,7 @@ macro_rules! downcast_temporal {
$crate::repeat_pat!(arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _), $($data_type),+) => {
$m!($crate::types::TimestampNanosecondType $(, $args)*)
}
$(($($p),+) => $fallback,)*
$($p => $fallback,)*
}
};
}
Expand Down Expand Up @@ -237,16 +237,16 @@ macro_rules! downcast_temporal_array {
($values:ident => $e:expr, $($p:pat => $fallback:expr $(,)*)*) => {
$crate::downcast_temporal_array!($values => {$e} $($p => $fallback)*)
};
(($($values:ident),+) => $e:block $($($p:pat),+ => $fallback:expr $(,)*)*) => {
$crate::downcast_temporal_array!($($values),+ => $e $($($p),+ => $fallback)*)
(($($values:ident),+) => $e:expr, $($p:pat => $fallback:expr $(,)*)*) => {
$crate::downcast_temporal_array!($($values),+ => {$e} $($p => $fallback)*)
};
(($($values:ident),+) => $e:block $(($($p:pat),+) => $fallback:expr $(,)*)*) => {
$crate::downcast_temporal_array!($($values),+ => $e $($($p),+ => $fallback)*)
($($values:ident),+ => $e:block $($p:pat => $fallback:expr $(,)*)*) => {
$crate::downcast_temporal_array!(($($values),+) => $e $($p => $fallback)*)
};
($($values:ident),+ => $e:block $($($p:pat),+ => $fallback:expr $(,)*)*) => {
(($($values:ident),+) => $e:block $($p:pat => $fallback:expr $(,)*)*) => {
$crate::downcast_temporal!{
$($values.data_type()),+ => ($crate::downcast_primitive_array_helper, $($values),+, $e),
$($($p),+ => $fallback,)*
$($p => $fallback,)*
}
};
}
Expand Down Expand Up @@ -281,7 +281,7 @@ macro_rules! downcast_temporal_array {
/// [`DataType`]: arrow_schema::DataType
#[macro_export]
macro_rules! downcast_primitive {
($($data_type:expr),+ => ($m:path $(, $args:tt)*), $($($p:pat),+ => $fallback:expr $(,)*)*) => {
($($data_type:expr),+ => ($m:path $(, $args:tt)*), $($p:pat => $fallback:expr $(,)*)*) => {
$crate::downcast_integer! {
$($data_type),+ => ($m $(, $args)*),
$crate::repeat_pat!(arrow_schema::DataType::Float16, $($data_type),+) => {
Expand Down Expand Up @@ -323,7 +323,7 @@ macro_rules! downcast_primitive {
_ => {
$crate::downcast_temporal! {
$($data_type),+ => ($m $(, $args)*),
$($($p),+ => $fallback,)*
$($p => $fallback,)*
}
}
}
Expand Down Expand Up @@ -369,16 +369,16 @@ macro_rules! downcast_primitive_array {
($values:ident => $e:expr, $($p:pat => $fallback:expr $(,)*)*) => {
$crate::downcast_primitive_array!($values => {$e} $($p => $fallback)*)
};
(($($values:ident),+) => $e:block $($($p:pat),+ => $fallback:expr $(,)*)*) => {
$crate::downcast_primitive_array!($($values),+ => $e $($($p),+ => $fallback)*)
(($($values:ident),+) => $e:expr, $($p:pat => $fallback:expr $(,)*)*) => {
$crate::downcast_primitive_array!($($values),+ => {$e} $($p => $fallback)*)
};
(($($values:ident),+) => $e:block $(($($p:pat),+) => $fallback:expr $(,)*)*) => {
$crate::downcast_primitive_array!($($values),+ => $e $($($p),+ => $fallback)*)
($($values:ident),+ => $e:block $($p:pat => $fallback:expr $(,)*)*) => {
$crate::downcast_primitive_array!(($($values),+) => $e $($p => $fallback)*)
};
($($values:ident),+ => $e:block $($($p:pat),+ => $fallback:expr $(,)*)*) => {
(($($values:ident),+) => $e:block $($p:pat => $fallback:expr $(,)*)*) => {
$crate::downcast_primitive!{
$($values.data_type()),+ => ($crate::downcast_primitive_array_helper, $($values),+, $e),
$($($p),+ => $fallback,)*
$($p => $fallback,)*
}
};
}
Expand Down Expand Up @@ -577,7 +577,7 @@ macro_rules! downcast_run_array {
}

/// Force downcast of an [`Array`], such as an [`ArrayRef`] to
/// [`GenericListArray<T>`], panic'ing on failure.
/// [`GenericListArray<T>`], panicking on failure.
pub fn as_generic_list_array<S: OffsetSizeTrait>(
arr: &dyn Array,
) -> &GenericListArray<S> {
Expand All @@ -587,14 +587,14 @@ pub fn as_generic_list_array<S: OffsetSizeTrait>(
}

/// Force downcast of an [`Array`], such as an [`ArrayRef`] to
/// [`ListArray`], panic'ing on failure.
/// [`ListArray`], panicking on failure.
#[inline]
pub fn as_list_array(arr: &dyn Array) -> &ListArray {
as_generic_list_array::<i32>(arr)
}

/// Force downcast of an [`Array`], such as an [`ArrayRef`] to
/// [`FixedSizeListArray`], panic'ing on failure.
/// [`FixedSizeListArray`], panicking on failure.
#[inline]
pub fn as_fixed_size_list_array(arr: &dyn Array) -> &FixedSizeListArray {
arr.as_any()
Expand All @@ -603,14 +603,14 @@ pub fn as_fixed_size_list_array(arr: &dyn Array) -> &FixedSizeListArray {
}

/// Force downcast of an [`Array`], such as an [`ArrayRef`] to
/// [`LargeListArray`], panic'ing on failure.
/// [`LargeListArray`], panicking on failure.
#[inline]
pub fn as_large_list_array(arr: &dyn Array) -> &LargeListArray {
as_generic_list_array::<i64>(arr)
}

/// Force downcast of an [`Array`], such as an [`ArrayRef`] to
/// [`GenericBinaryArray<S>`], panic'ing on failure.
/// [`GenericBinaryArray<S>`], panicking on failure.
#[inline]
pub fn as_generic_binary_array<S: OffsetSizeTrait>(
arr: &dyn Array,
Expand All @@ -621,7 +621,7 @@ pub fn as_generic_binary_array<S: OffsetSizeTrait>(
}

/// Force downcast of an [`Array`], such as an [`ArrayRef`] to
/// [`StringArray`], panic'ing on failure.
/// [`StringArray`], panicking on failure.
///
/// # Example
///
Expand All @@ -640,7 +640,7 @@ pub fn as_string_array(arr: &dyn Array) -> &StringArray {
}

/// Force downcast of an [`Array`], such as an [`ArrayRef`] to
/// [`BooleanArray`], panic'ing on failure.
/// [`BooleanArray`], panicking on failure.
///
/// # Example
///
Expand Down Expand Up @@ -675,7 +675,7 @@ macro_rules! array_downcast_fn {
array_downcast_fn!(
$name,
$arrty,
concat!("[`", stringify!($arrty), "`], panic'ing on failure.")
concat!("[`", stringify!($arrty), "`], panicking on failure.")
);
};
}
Expand Down
3 changes: 3 additions & 0 deletions arrow-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ pub use arithmetic::ArrowNativeTypeOp;
mod numeric;
pub use numeric::*;

mod scalar;
pub use scalar::*;

pub mod builder;
pub mod cast;
mod delta;
Expand Down
116 changes: 116 additions & 0 deletions arrow-array/src/scalar.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::{
downcast_primitive_array, Array, ArrowPrimitiveType, BooleanArray, Int32Array,
PrimitiveArray,
};
use arrow_buffer::{BooleanBuffer, MutableBuffer, NullBuffer};
use arrow_schema::ArrowError;

/// A possibly [`Scalar`] [`Array`]
///
/// This allows optimised binary kernels where one or more arguments are constant
///
/// ```
/// # use arrow_array::*;
/// # use arrow_buffer::{BooleanBuffer, MutableBuffer, NullBuffer};
/// # use arrow_schema::ArrowError;
/// #
/// fn eq_impl<T: ArrowPrimitiveType>(
/// a: &PrimitiveArray<T>,
/// a_scalar: bool,
/// b: &PrimitiveArray<T>,
/// b_scalar: bool,
/// ) -> BooleanArray {
/// let (array, scalar) = match (a_scalar, b_scalar) {
/// (true, true) | (false, false) => {
/// let len = a.len().min(b.len());
/// let nulls = NullBuffer::union(a.nulls(), b.nulls());
/// let buffer =
/// BooleanBuffer::collect_bool(len, |idx| a.value(idx) == b.value(idx));
/// return BooleanArray::new(buffer, nulls);
/// }
/// (true, false) => (b, (a.null_count() == 0).then(|| a.value(0))),
/// (false, true) => (a, (b.null_count() == 0).then(|| b.value(0))),
/// };
/// match scalar {
/// Some(v) => {
/// let len = array.len();
/// let nulls = array.nulls().cloned();
/// let buffer = MutableBuffer::collect_bool(len, |idx| array.value(idx) == v);
/// BooleanArray::new(BooleanBuffer::new(buffer.into(), 0, len), nulls)
/// }
/// None => BooleanArray::new_null(array.len()),
/// }
/// }
///
/// pub fn eq(l: &dyn Datum, r: &dyn Datum) -> Result<BooleanArray, ArrowError> {
/// let (l_array, l_scalar) = l.get();
/// let (r_array, r_scalar) = r.get();
/// downcast_primitive_array!(
/// (l_array, r_array) => Ok(eq_impl(l_array, l_scalar, r_array, r_scalar)),
/// (a, b) => Err(ArrowError::NotYetImplemented(format!("{a} == {b}"))),
/// )
/// }
///
/// // Comparison of two arrays
/// let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
/// let b = Int32Array::from(vec![1, 2, 4, 7, 3]);
/// let r = eq(&a, &b).unwrap();
/// let values: Vec<_> = r.values().iter().collect();
/// assert_eq!(values, &[true, true, true, false, false]);
///
/// // Comparison of an array and a scalar
/// let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
/// let b = Int32Array::from(vec![1]);
/// let r = eq(&a, &Scalar::new(&b)).unwrap();
/// let values: Vec<_> = r.values().iter().collect();
/// assert_eq!(values, &[true, false, false, false, false]);
pub trait Datum {
/// Returns the value for this [`Datum`] and a boolean indicating if the value is scalar
fn get(&self) -> (&dyn Array, bool);
}

impl<T: Array> Datum for T {
fn get(&self) -> (&dyn Array, bool) {
(self, false)
}
}

/// A wrapper around a single value [`Array`] indicating kernels should treat it as a scalar value
///
/// See [`Datum`] for more information
pub struct Scalar<'a>(&'a dyn Array);

impl<'a> Scalar<'a> {
/// Create a new [`Scalar`] from an [`Array`]
///
/// # Panics
///
/// Panics if `array.len() != 1`
pub fn new(array: &'a dyn Array) -> Self {
assert_eq!(array.len(), 1);
Self(array)
}
}

impl<'a> Datum for Scalar<'a> {
fn get(&self) -> (&dyn Array, bool) {
(self.0, true)
}
}
10 changes: 10 additions & 0 deletions arrow-buffer/src/buffer/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ impl BooleanBuffer {
}
}

/// Create a new [`BooleanBuffer`] of the given `length` where all values are `false`
pub fn new_zeroed(len: usize) -> Self {
let buffer = MutableBuffer::new_null(len).into_buffer();
Self {
buffer,
offset: 0,
len,
}
}

/// Invokes `f` with indexes `0..len` collecting the boolean results into a new `BooleanBuffer`
pub fn collect_bool<F: FnMut(usize) -> bool>(len: usize, f: F) -> Self {
let buffer = MutableBuffer::collect_bool(len, f);
Expand Down
4 changes: 1 addition & 3 deletions arrow-buffer/src/buffer/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@ impl NullBuffer {

/// Create a new [`NullBuffer`] of length `len` where all values are null
pub fn new_null(len: usize) -> Self {
let buffer = MutableBuffer::new_null(len).into_buffer();
let buffer = BooleanBuffer::new(buffer, 0, len);
Self {
buffer,
buffer: BooleanBuffer::new_zeroed(len),
null_count: len,
}
}
Expand Down
10 changes: 0 additions & 10 deletions arrow-select/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,16 +321,6 @@ fn filter_array(
// actually filter
_ => downcast_primitive_array! {
values => Ok(Arc::new(filter_primitive(values, predicate))),
DataType::Decimal128(p, s) => {
let values = values.as_any().downcast_ref::<Decimal128Array>().unwrap();
let filtered = filter_primitive(values, predicate);
Ok(Arc::new(filtered.with_precision_and_scale(*p, *s).unwrap()))
}
DataType::Decimal256(p, s) => {
let values = values.as_any().downcast_ref::<Decimal256Array>().unwrap();
let filtered = filter_primitive(values, predicate);
Ok(Arc::new(filtered.with_precision_and_scale(*p, *s).unwrap()))
}
DataType::Boolean => {
let values = values.as_any().downcast_ref::<BooleanArray>().unwrap();
Ok(Arc::new(filter_boolean(values, predicate)))
Expand Down

0 comments on commit c577e76

Please sign in to comment.