diff --git a/arrow-array/src/array/mod.rs b/arrow-array/src/array/mod.rs index 1e17e35d0f6d..69f6ba4d8de1 100644 --- a/arrow-array/src/array/mod.rs +++ b/arrow-array/src/array/mod.rs @@ -64,6 +64,9 @@ pub use struct_array::*; mod union_array; pub use union_array::*; +mod run_array; +pub use run_array::*; + /// Trait for dealing with different types of array at runtime when the type of the /// array is not known in advance. pub trait Array: std::fmt::Debug + Send + Sync { @@ -579,6 +582,20 @@ pub fn make_array(data: ArrayData) -> ArrayRef { } dt => panic!("Unexpected dictionary key type {:?}", dt), }, + DataType::RunEndEncoded(ref run_ends_type, _) => { + match run_ends_type.data_type() { + DataType::Int16 => { + Arc::new(RunArray::::from(data)) as ArrayRef + } + DataType::Int32 => { + Arc::new(RunArray::::from(data)) as ArrayRef + } + DataType::Int64 => { + Arc::new(RunArray::::from(data)) as ArrayRef + } + dt => panic!("Unexpected data type for run_ends array {:?}", dt), + } + } DataType::Null => Arc::new(NullArray::from(data)) as ArrayRef, DataType::Decimal128(_, _) => Arc::new(Decimal128Array::from(data)) as ArrayRef, DataType::Decimal256(_, _) => Arc::new(Decimal256Array::from(data)) as ArrayRef, @@ -737,6 +754,7 @@ pub fn new_null_array(data_type: &DataType, length: usize) -> ArrayRef { new_null_sized_decimal(data_type, length, std::mem::size_of::()) } DataType::Decimal256(_, _) => new_null_sized_decimal(data_type, length, 32), + DataType::RunEndEncoded(_, _) => todo!(), } } diff --git a/arrow-array/src/array/run_array.rs b/arrow-array/src/array/run_array.rs new file mode 100644 index 000000000000..05b5660ba906 --- /dev/null +++ b/arrow-array/src/array/run_array.rs @@ -0,0 +1,672 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; + +use arrow_buffer::ArrowNativeType; +use arrow_data::{ArrayData, ArrayDataBuilder}; +use arrow_schema::{ArrowError, DataType, Field}; + +use crate::{ + builder::StringRunBuilder, + make_array, + run_iterator::RunArrayIter, + types::{Int16Type, Int32Type, Int64Type, RunEndIndexType}, + Array, ArrayAccessor, ArrayRef, PrimitiveArray, +}; + +/// +/// A run-end encoding (REE) is a variation of [run-length encoding (RLE)](https://en.wikipedia.org/wiki/Run-length_encoding). +/// +/// This encoding is good for representing data containing same values repeated consecutively. +/// +/// [`RunArray`] contains `run_ends` array and `values` array of same length. +/// The `run_ends` array stores the indexes at which the run ends. The `values` array +/// stores the value of each run. Below example illustrates how a logical array is represented in +/// [`RunArray`] +/// +/// +/// ```text +/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─┐ +/// ┌─────────────────┐ ┌─────────┐ ┌─────────────────┐ +/// │ │ A │ │ 2 │ │ │ A │ +/// ├─────────────────┤ ├─────────┤ ├─────────────────┤ +/// │ │ D │ │ 3 │ │ │ A │ run length of 'A' = runs_ends[0] - 0 = 2 +/// ├─────────────────┤ ├─────────┤ ├─────────────────┤ +/// │ │ B │ │ 6 │ │ │ D │ run length of 'D' = run_ends[1] - run_ends[0] = 1 +/// └─────────────────┘ └─────────┘ ├─────────────────┤ +/// │ values run_ends │ │ B │ +/// ├─────────────────┤ +/// └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─┘ │ B │ +/// ├─────────────────┤ +/// RunArray │ B │ run length of 'B' = run_ends[2] - run_ends[1] = 3 +/// length = 3 └─────────────────┘ +/// +/// Logical array +/// Contents +/// ``` + +pub struct RunArray { + data: ArrayData, + run_ends: PrimitiveArray, + values: ArrayRef, +} + +impl RunArray { + // calculates the logical length of the array encoded + // by the given run_ends array. + fn logical_len(run_ends: &PrimitiveArray) -> usize { + let len = run_ends.len(); + if len == 0 { + return 0; + } + run_ends.value(len - 1).as_usize() + } + + /// Attempts to create RunArray using given run_ends (index where a run ends) + /// and the values (value of the run). Returns an error if the given data is not compatible + /// with RunEndEncoded specification. + pub fn try_new( + run_ends: &PrimitiveArray, + values: &dyn Array, + ) -> Result { + let run_ends_type = run_ends.data_type().clone(); + let values_type = values.data_type().clone(); + let ree_array_type = DataType::RunEndEncoded( + Box::new(Field::new("run_ends", run_ends_type, false)), + Box::new(Field::new("values", values_type, true)), + ); + let len = RunArray::logical_len(run_ends); + let builder = ArrayDataBuilder::new(ree_array_type) + .len(len) + .add_child_data(run_ends.data().clone()) + .add_child_data(values.data().clone()); + + // `build_unchecked` is used to avoid recursive validation of child arrays. + let array_data = unsafe { builder.build_unchecked() }; + + // Safety: `validate_data` checks below + // 1. The given array data has exactly two child arrays. + // 2. The first child array (run_ends) has valid data type. + // 3. run_ends array does not have null values + // 4. run_ends array has non-zero and strictly increasing values. + // 5. The length of run_ends array and values array are the same. + array_data.validate_data()?; + + Ok(array_data.into()) + } + /// Returns a reference to run_ends array + /// + /// Note: any slicing of this array is not applied to the returned array + /// and must be handled separately + pub fn run_ends(&self) -> &PrimitiveArray { + &self.run_ends + } + + /// Returns a reference to values array + pub fn values(&self) -> &ArrayRef { + &self.values + } + + /// Downcast this dictionary to a [`TypedRunArray`] + /// + /// ``` + /// use arrow_array::{Array, ArrayAccessor, RunArray, StringArray, types::Int32Type}; + /// + /// let orig = [Some("a"), Some("b"), None]; + /// let run_array = RunArray::::from_iter(orig); + /// let typed = run_array.downcast_ref::().unwrap(); + /// assert_eq!(typed.value(0), "a"); + /// assert_eq!(typed.value(1), "b"); + /// assert!(typed.values().is_null(2)); + /// ``` + /// + pub fn downcast_ref(&self) -> Option> { + let values = self.values.as_any().downcast_ref()?; + Some(TypedRunArray { + run_array: self, + values, + }) + } +} + +impl From for RunArray { + // The method assumes the caller already validated the data using `ArrayData::validate_data()` + fn from(data: ArrayData) -> Self { + match data.data_type() { + DataType::RunEndEncoded(_, _) => {} + _ => { + panic!("Invalid data type for RunArray. The data type should be DataType::RunEndEncoded"); + } + } + + let run_ends = PrimitiveArray::::from(data.child_data()[0].clone()); + let values = make_array(data.child_data()[1].clone()); + Self { + data, + run_ends, + values, + } + } +} + +impl From> for ArrayData { + fn from(array: RunArray) -> Self { + array.data + } +} + +impl Array for RunArray { + fn as_any(&self) -> &dyn Any { + self + } + + fn data(&self) -> &ArrayData { + &self.data + } + + fn into_data(self) -> ArrayData { + self.into() + } +} + +impl std::fmt::Debug for RunArray { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + writeln!( + f, + "RunArray {{run_ends: {:?}, values: {:?}}}", + self.run_ends, self.values + ) + } +} + +/// Constructs a `RunArray` from an iterator of optional strings. +/// +/// # Example: +/// ``` +/// use arrow_array::{RunArray, PrimitiveArray, StringArray, types::Int16Type}; +/// +/// let test = vec!["a", "a", "b", "c", "c"]; +/// let array: RunArray = test +/// .iter() +/// .map(|&x| if x == "b" { None } else { Some(x) }) +/// .collect(); +/// assert_eq!( +/// "RunArray {run_ends: PrimitiveArray\n[\n 2,\n 3,\n 5,\n], values: StringArray\n[\n \"a\",\n null,\n \"c\",\n]}\n", +/// format!("{:?}", array) +/// ); +/// ``` +impl<'a, T: RunEndIndexType> FromIterator> for RunArray { + fn from_iter>>(iter: I) -> Self { + let it = iter.into_iter(); + let (lower, _) = it.size_hint(); + let mut builder = StringRunBuilder::with_capacity(lower, 256); + it.for_each(|i| { + builder.append_option(i); + }); + + builder.finish() + } +} + +/// Constructs a `RunArray` from an iterator of strings. +/// +/// # Example: +/// +/// ``` +/// use arrow_array::{RunArray, PrimitiveArray, StringArray, types::Int16Type}; +/// +/// let test = vec!["a", "a", "b", "c"]; +/// let array: RunArray = test.into_iter().collect(); +/// assert_eq!( +/// "RunArray {run_ends: PrimitiveArray\n[\n 2,\n 3,\n 4,\n], values: StringArray\n[\n \"a\",\n \"b\",\n \"c\",\n]}\n", +/// format!("{:?}", array) +/// ); +/// ``` +impl<'a, T: RunEndIndexType> FromIterator<&'a str> for RunArray { + fn from_iter>(iter: I) -> Self { + let it = iter.into_iter(); + let (lower, _) = it.size_hint(); + let mut builder = StringRunBuilder::with_capacity(lower, 256); + it.for_each(|i| { + builder.append_value(i); + }); + + builder.finish() + } +} + +/// +/// A [`RunArray`] array where run ends are stored using `i16` data type. +/// +/// # Example: Using `collect` +/// ``` +/// # use arrow_array::{Array, Int16RunArray, Int16Array, StringArray}; +/// # use std::sync::Arc; +/// +/// let array: Int16RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect(); +/// let values: Arc = Arc::new(StringArray::from(vec!["a", "b", "c"])); +/// assert_eq!(array.run_ends(), &Int16Array::from(vec![2, 3, 5])); +/// assert_eq!(array.values(), &values); +/// ``` +pub type Int16RunArray = RunArray; + +/// +/// A [`RunArray`] array where run ends are stored using `i32` data type. +/// +/// # Example: Using `collect` +/// ``` +/// # use arrow_array::{Array, Int32RunArray, Int32Array, StringArray}; +/// # use std::sync::Arc; +/// +/// let array: Int32RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect(); +/// let values: Arc = Arc::new(StringArray::from(vec!["a", "b", "c"])); +/// assert_eq!(array.run_ends(), &Int32Array::from(vec![2, 3, 5])); +/// assert_eq!(array.values(), &values); +/// ``` +pub type Int32RunArray = RunArray; + +/// +/// A [`RunArray`] array where run ends are stored using `i64` data type. +/// +/// # Example: Using `collect` +/// ``` +/// # use arrow_array::{Array, Int64RunArray, Int64Array, StringArray}; +/// # use std::sync::Arc; +/// +/// let array: Int64RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect(); +/// let values: Arc = Arc::new(StringArray::from(vec!["a", "b", "c"])); +/// assert_eq!(array.run_ends(), &Int64Array::from(vec![2, 3, 5])); +/// assert_eq!(array.values(), &values); +/// ``` +pub type Int64RunArray = RunArray; + +/// The trait defines functions that helps access the run array +/// properties and values +pub trait RunArrayAccessor { + /// Length of the physical array in [`RunArray`] + fn physical_len(&self) -> usize; + + /// The logical index at which the `physical_index` run ends. + /// i.e. value at the index `physical_index` in run_ends array. + fn run_end_index(&self, physical_index: usize) -> Option; + + /// Returns true if the value is null in the `physical_index` + fn is_value_null(&self, physical_index: usize) -> bool; +} + +/// A strongly-typed wrapper around a [`RunArray`] that implements [`ArrayAccessor`] +/// and [`IntoIterator`] allowing fast access to its elements +/// +/// ``` +/// use arrow_array::{RunArray, StringArray, types::Int32Type}; +/// +/// let orig = ["a", "b", "a", "b"]; +/// let ree_array = RunArray::::from_iter(orig); +/// +/// // `TypedRunArray` allows you to access the values directly +/// let typed = ree_array.downcast_ref::().unwrap(); +/// +/// for (maybe_val, orig) in typed.into_iter().zip(orig) { +/// assert_eq!(maybe_val.unwrap(), orig) +/// } +/// ``` +pub struct TypedRunArray<'a, R: RunEndIndexType, V> { + /// The ree array + run_array: &'a RunArray, + /// The values of the run_array + values: &'a V, +} + +// Manually implement `Clone` to avoid `V: Clone` type constraint +impl<'a, R: RunEndIndexType, V> Clone for TypedRunArray<'a, R, V> { + fn clone(&self) -> Self { + Self { + run_array: self.run_array, + values: self.values, + } + } +} + +impl<'a, R: RunEndIndexType, V> Copy for TypedRunArray<'a, R, V> {} + +impl<'a, R: RunEndIndexType, V> std::fmt::Debug for TypedRunArray<'a, R, V> { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + writeln!(f, "TypedRunArray({:?})", self.run_array) + } +} + +impl<'a, R: RunEndIndexType, V> TypedRunArray<'a, R, V> { + /// Returns the run_ends of this [`TypedRunArray`] + pub fn run_ends(&self) -> &'a PrimitiveArray { + self.run_array.run_ends() + } + + /// Returns the values of this [`TypedRunArray`] + pub fn values(&self) -> &'a V { + self.values + } +} + +impl<'a, R: RunEndIndexType, V: Sync> Array for TypedRunArray<'a, R, V> { + fn as_any(&self) -> &dyn Any { + self.run_array + } + + fn data(&self) -> &ArrayData { + &self.run_array.data + } + + fn into_data(self) -> ArrayData { + self.run_array.into_data() + } +} + +impl RunArrayAccessor for TypedRunArray<'_, R, V> { + fn physical_len(&self) -> usize { + self.run_ends().len() + } + + fn run_end_index(&self, physical_index: usize) -> Option { + if physical_index >= self.run_ends().len() { + None + } else { + Some(unsafe { + // Safety: + // As the physical_index bounds is checked above + // The array can be accessed without validation + self.run_ends().value_unchecked(physical_index).as_usize() + }) + } + } + + fn is_value_null(&self, physical_index: usize) -> bool { + self.run_array.values().is_null(physical_index) + } +} + +// The array accessor returns value based on physical array index. +// Its the responsibility of the caller of this function to convert from +// logical index to physical index. +impl<'a, R, V> ArrayAccessor for TypedRunArray<'a, R, V> +where + R: RunEndIndexType, + V: Sync + Send, + &'a V: ArrayAccessor, + <&'a V as ArrayAccessor>::Item: Default, +{ + type Item = <&'a V as ArrayAccessor>::Item; + + fn value(&self, index: usize) -> Self::Item { + assert!( + index < self.len(), + "Trying to access an element at index {} from a TypedRunArray of length {}", + index, + self.len() + ); + unsafe { self.value_unchecked(index) } + } + + unsafe fn value_unchecked(&self, index: usize) -> Self::Item { + self.values.value_unchecked(index) + } +} + +impl<'a, R, V> IntoIterator for TypedRunArray<'a, R, V> +where + R: RunEndIndexType, + Self: ArrayAccessor, +{ + type Item = Option<::Item>; + type IntoIter = RunArrayIter; + + fn into_iter(self) -> Self::IntoIter { + RunArrayIter::new(self) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use crate::builder::PrimitiveRunBuilder; + use crate::types::{Int16Type, Int32Type, Int8Type, UInt32Type}; + use crate::{Array, Int16Array, Int32Array, StringArray}; + + #[test] + fn test_run_array() { + // Construct a value array + let value_data = PrimitiveArray::::from_iter_values([ + 10_i8, 11, 12, 13, 14, 15, 16, 17, + ]); + + // Construct a run_ends array: + let run_ends_data = PrimitiveArray::::from_iter_values([ + 4_i16, 6, 7, 9, 13, 18, 20, 22, + ]); + + // Construct a run ends encoded array from the above two + let ree_array = + RunArray::::try_new(&run_ends_data, &value_data).unwrap(); + + assert_eq!(ree_array.len(), 22); + assert_eq!(ree_array.null_count(), 0); + + let values = ree_array.values(); + assert_eq!(&value_data.into_data(), values.data()); + assert_eq!(&DataType::Int8, values.data_type()); + + let run_ends = ree_array.run_ends(); + assert_eq!(&run_ends_data.into_data(), run_ends.data()); + assert_eq!(&DataType::Int16, run_ends.data_type()); + } + + #[test] + fn test_run_array_fmt_debug() { + let mut builder = PrimitiveRunBuilder::::with_capacity(3); + builder.append_value(12345678); + builder.append_null(); + builder.append_value(22345678); + let array = builder.finish(); + assert_eq!( + "RunArray {run_ends: PrimitiveArray\n[\n 1,\n 2,\n 3,\n], values: PrimitiveArray\n[\n 12345678,\n null,\n 22345678,\n]}\n", + format!("{:?}", array) + ); + + let mut builder = PrimitiveRunBuilder::::with_capacity(20); + for _ in 0..20 { + builder.append_value(1); + } + let array = builder.finish(); + + assert_eq!(array.len(), 20); + assert_eq!(array.null_count(), 0); + + assert_eq!( + "RunArray {run_ends: PrimitiveArray\n[\n 20,\n], values: PrimitiveArray\n[\n 1,\n]}\n", + format!("{:?}", array) + ); + } + + #[test] + fn test_run_array_from_iter() { + let test = vec!["a", "a", "b", "c"]; + let array: RunArray = test + .iter() + .map(|&x| if x == "b" { None } else { Some(x) }) + .collect(); + assert_eq!( + "RunArray {run_ends: PrimitiveArray\n[\n 2,\n 3,\n 4,\n], values: StringArray\n[\n \"a\",\n null,\n \"c\",\n]}\n", + format!("{:?}", array) + ); + + assert_eq!(array.len(), 4); + assert_eq!(array.null_count(), 0); + + let array: RunArray = test.into_iter().collect(); + assert_eq!( + "RunArray {run_ends: PrimitiveArray\n[\n 2,\n 3,\n 4,\n], values: StringArray\n[\n \"a\",\n \"b\",\n \"c\",\n]}\n", + format!("{:?}", array) + ); + } + + #[test] + fn test_run_array_run_ends_as_primitive_array() { + let test = vec!["a", "b", "c", "a"]; + let array: RunArray = test.into_iter().collect(); + + assert_eq!(array.len(), 4); + assert_eq!(array.null_count(), 0); + + let run_ends = array.run_ends(); + assert_eq!(&DataType::Int16, run_ends.data_type()); + assert_eq!(0, run_ends.null_count()); + assert_eq!(&[1, 2, 3, 4], run_ends.values()); + } + + #[test] + fn test_run_array_as_primitive_array_with_null() { + let test = vec![Some("a"), None, Some("b"), None, None, Some("a")]; + let array: RunArray = test.into_iter().collect(); + + assert_eq!(array.len(), 6); + assert_eq!(array.null_count(), 0); + + let run_ends = array.run_ends(); + assert_eq!(&DataType::Int32, run_ends.data_type()); + assert_eq!(0, run_ends.null_count()); + assert_eq!(5, run_ends.len()); + assert_eq!(&[1, 2, 3, 5, 6], run_ends.values()); + + let values_data = array.values(); + assert_eq!(2, values_data.null_count()); + assert_eq!(5, values_data.len()); + } + + #[test] + fn test_run_array_all_nulls() { + let test = vec![None, None, None]; + let array: RunArray = test.into_iter().collect(); + + assert_eq!(array.len(), 3); + assert_eq!(array.null_count(), 0); + + let run_ends = array.run_ends(); + assert_eq!(1, run_ends.len()); + assert_eq!(&[3], run_ends.values()); + + let values_data = array.values(); + assert_eq!(1, values_data.null_count()); + } + + #[test] + fn test_run_array_try_new() { + let values: StringArray = [Some("foo"), Some("bar"), None, Some("baz")] + .into_iter() + .collect(); + let run_ends: Int32Array = + [Some(1), Some(2), Some(3), Some(4)].into_iter().collect(); + + let array = RunArray::::try_new(&run_ends, &values).unwrap(); + assert_eq!(array.run_ends().data_type(), &DataType::Int32); + assert_eq!(array.values().data_type(), &DataType::Utf8); + + assert_eq!(array.null_count(), 0); + assert_eq!(array.len(), 4); + assert_eq!(array.run_ends.null_count(), 0); + assert_eq!(array.values().null_count(), 1); + + assert_eq!( + "RunArray {run_ends: PrimitiveArray\n[\n 1,\n 2,\n 3,\n 4,\n], values: StringArray\n[\n \"foo\",\n \"bar\",\n null,\n \"baz\",\n]}\n", + format!("{:?}", array) + ); + } + + #[test] + fn test_run_array_int16_type_definition() { + let array: Int16RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect(); + let values: Arc = Arc::new(StringArray::from(vec!["a", "b", "c"])); + assert_eq!(array.run_ends(), &Int16Array::from(vec![2, 3, 5])); + assert_eq!(array.values(), &values); + } + + #[test] + fn test_run_array_empty_string() { + let array: Int16RunArray = vec!["a", "a", "", "", "c"].into_iter().collect(); + let values: Arc = Arc::new(StringArray::from(vec!["a", "", "c"])); + assert_eq!(array.run_ends(), &Int16Array::from(vec![2, 4, 5])); + assert_eq!(array.values(), &values); + } + + #[test] + fn test_run_array_length_mismatch() { + let values: StringArray = [Some("foo"), Some("bar"), None, Some("baz")] + .into_iter() + .collect(); + let run_ends: Int32Array = [Some(1), Some(2), Some(3)].into_iter().collect(); + + let actual = RunArray::::try_new(&run_ends, &values); + let expected = ArrowError::InvalidArgumentError("The run_ends array length should be the same as values array length. Run_ends array length is 3, values array length is 4".to_string()); + assert_eq!(expected.to_string(), actual.err().unwrap().to_string()); + } + + #[test] + fn test_run_array_run_ends_with_null() { + let values: StringArray = [Some("foo"), Some("bar"), Some("baz")] + .into_iter() + .collect(); + let run_ends: Int32Array = [Some(1), None, Some(3)].into_iter().collect(); + + let actual = RunArray::::try_new(&run_ends, &values); + let expected = ArrowError::InvalidArgumentError("Found null values in run_ends array. The run_ends array should not have null values.".to_string()); + assert_eq!(expected.to_string(), actual.err().unwrap().to_string()); + } + + #[test] + fn test_run_array_run_ends_with_zeroes() { + let values: StringArray = [Some("foo"), Some("bar"), Some("baz")] + .into_iter() + .collect(); + let run_ends: Int32Array = [Some(0), Some(1), Some(3)].into_iter().collect(); + + let actual = RunArray::::try_new(&run_ends, &values); + let expected = ArrowError::InvalidArgumentError("The values in run_ends array should be strictly positive. Found value 0 at index 0 that does not match the criteria.".to_string()); + assert_eq!(expected.to_string(), actual.err().unwrap().to_string()); + } + + #[test] + fn test_run_array_run_ends_non_increasing() { + let values: StringArray = [Some("foo"), Some("bar"), Some("baz")] + .into_iter() + .collect(); + let run_ends: Int32Array = [Some(1), Some(4), Some(4)].into_iter().collect(); + + let actual = RunArray::::try_new(&run_ends, &values); + let expected = ArrowError::InvalidArgumentError("The values in run_ends array should be strictly increasing. Found value 4 at index 2 with previous value 4 that does not match the criteria.".to_string()); + assert_eq!(expected.to_string(), actual.err().unwrap().to_string()); + } + + #[test] + #[should_panic( + expected = "PrimitiveArray expected ArrayData with type Int64 got Int32" + )] + fn test_run_array_run_ends_data_type_mismatch() { + let a = RunArray::::from_iter(["32"]); + let _ = RunArray::::from(a.into_data()); + } +} diff --git a/arrow-array/src/builder/generic_byte_run_builder.rs b/arrow-array/src/builder/generic_byte_run_builder.rs new file mode 100644 index 000000000000..49e3d8e23add --- /dev/null +++ b/arrow-array/src/builder/generic_byte_run_builder.rs @@ -0,0 +1,569 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::types::bytes::ByteArrayNativeType; +use std::{any::Any, sync::Arc}; + +use crate::{ + types::{ + BinaryType, ByteArrayType, LargeBinaryType, LargeUtf8Type, RunEndIndexType, + Utf8Type, + }, + ArrayRef, ArrowPrimitiveType, RunArray, +}; + +use super::{ArrayBuilder, GenericByteBuilder, PrimitiveBuilder}; + +use arrow_buffer::ArrowNativeType; + +/// Array builder for [`RunArray`] for String and Binary types. +/// +/// # Example: +/// +/// ``` +/// +/// # use arrow_array::builder::GenericByteRunBuilder; +/// # use arrow_array::{GenericByteArray, BinaryArray}; +/// # use arrow_array::types::{BinaryType, Int16Type}; +/// # use arrow_array::{Array, Int16Array}; +/// +/// let mut builder = +/// GenericByteRunBuilder::::new(); +/// builder.extend([Some(b"abc"), Some(b"abc"), None, Some(b"def")].into_iter()); +/// builder.append_value(b"def"); +/// builder.append_null(); +/// let array = builder.finish(); +/// +/// assert_eq!( +/// array.run_ends(), +/// &Int16Array::from(vec![Some(2), Some(3), Some(5), Some(6)]) +/// ); +/// +/// let av = array.values(); +/// +/// assert!(!av.is_null(0)); +/// assert!(av.is_null(1)); +/// assert!(!av.is_null(2)); +/// assert!(av.is_null(3)); +/// +/// // Values are polymorphic and so require a downcast. +/// let ava: &BinaryArray = av.as_any().downcast_ref::().unwrap(); +/// +/// assert_eq!(ava.value(0), b"abc"); +/// assert_eq!(ava.value(2), b"def"); +/// ``` +#[derive(Debug)] +pub struct GenericByteRunBuilder +where + R: ArrowPrimitiveType, + V: ByteArrayType, +{ + run_ends_builder: PrimitiveBuilder, + values_builder: GenericByteBuilder, + current_value: Vec, + has_current_value: bool, + current_run_end_index: usize, + prev_run_end_index: usize, +} + +impl Default for GenericByteRunBuilder +where + R: ArrowPrimitiveType, + V: ByteArrayType, +{ + fn default() -> Self { + Self::new() + } +} + +impl GenericByteRunBuilder +where + R: ArrowPrimitiveType, + V: ByteArrayType, +{ + /// Creates a new `GenericByteRunBuilder` + pub fn new() -> Self { + Self { + run_ends_builder: PrimitiveBuilder::new(), + values_builder: GenericByteBuilder::::new(), + current_value: Vec::new(), + has_current_value: false, + current_run_end_index: 0, + prev_run_end_index: 0, + } + } + + /// Creates a new `GenericByteRunBuilder` with the provided capacity + /// + /// `capacity`: the expected number of run-end encoded values. + /// `data_capacity`: the expected number of bytes of run end encoded values + pub fn with_capacity(capacity: usize, data_capacity: usize) -> Self { + Self { + run_ends_builder: PrimitiveBuilder::with_capacity(capacity), + values_builder: GenericByteBuilder::::with_capacity( + capacity, + data_capacity, + ), + current_value: Vec::new(), + has_current_value: false, + current_run_end_index: 0, + prev_run_end_index: 0, + } + } + + /// Returns the physical length of the encoded array + pub fn physical_len(&self) -> usize { + let mut len = self.run_ends_builder.len(); + + // If there is an ongoing run yet to be added, include it in the len + if self.prev_run_end_index != self.current_run_end_index { + len += 1; + } + len + } +} + +impl ArrayBuilder for GenericByteRunBuilder +where + R: RunEndIndexType, + V: ByteArrayType, +{ + /// Returns the builder as a non-mutable `Any` reference. + fn as_any(&self) -> &dyn Any { + self + } + + /// Returns the builder as a mutable `Any` reference. + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + + /// Returns the boxed builder as a box of `Any`. + fn into_box_any(self: Box) -> Box { + self + } + + /// Returns the length of logical array encoded by + /// the eventual runs array. + fn len(&self) -> usize { + self.current_run_end_index + } + + /// Returns whether the number of array slots is zero + fn is_empty(&self) -> bool { + self.current_run_end_index == 0 + } + + /// Builds the array and reset this builder. + fn finish(&mut self) -> ArrayRef { + Arc::new(self.finish()) + } + + /// Builds the array without resetting the builder. + fn finish_cloned(&self) -> ArrayRef { + Arc::new(self.finish_cloned()) + } +} + +impl GenericByteRunBuilder +where + R: RunEndIndexType, + V: ByteArrayType, +{ + /// Appends optional value to the logical array encoded by the RunArray. + pub fn append_option(&mut self, input_value: Option>) { + match input_value { + Some(value) => self.append_value(value), + None => self.append_null(), + } + } + + /// Appends value to the logical array encoded by the RunArray. + pub fn append_value(&mut self, input_value: impl AsRef) { + let value: &[u8] = input_value.as_ref().as_ref(); + if !self.has_current_value { + self.append_run_end(); + self.current_value.extend_from_slice(value); + self.has_current_value = true; + } else if self.current_value.as_slice() != value { + self.append_run_end(); + self.current_value.clear(); + self.current_value.extend_from_slice(value); + } + self.current_run_end_index += 1; + } + + /// Appends null to the logical array encoded by the RunArray. + pub fn append_null(&mut self) { + if self.has_current_value { + self.append_run_end(); + self.current_value.clear(); + self.has_current_value = false; + } + self.current_run_end_index += 1; + } + + /// Creates the RunArray and resets the builder. + /// Panics if RunArray cannot be built. + pub fn finish(&mut self) -> RunArray { + // write the last run end to the array. + self.append_run_end(); + + // reset the run end index to zero. + self.current_value.clear(); + self.has_current_value = false; + self.current_run_end_index = 0; + self.prev_run_end_index = 0; + + // build the run encoded array by adding run_ends and values array as its children. + let run_ends_array = self.run_ends_builder.finish(); + let values_array = self.values_builder.finish(); + RunArray::::try_new(&run_ends_array, &values_array).unwrap() + } + + /// Creates the RunArray and without resetting the builder. + /// Panics if RunArray cannot be built. + pub fn finish_cloned(&self) -> RunArray { + let mut run_ends_array = self.run_ends_builder.finish_cloned(); + let mut values_array = self.values_builder.finish_cloned(); + + // Add current run if one exists + if self.prev_run_end_index != self.current_run_end_index { + let mut run_end_builder = run_ends_array.into_builder().unwrap(); + let mut values_builder = values_array.into_builder().unwrap(); + self.append_run_end_with_builders(&mut run_end_builder, &mut values_builder); + run_ends_array = run_end_builder.finish(); + values_array = values_builder.finish(); + } + + RunArray::::try_new(&run_ends_array, &values_array).unwrap() + } + + // Appends the current run to the array. + fn append_run_end(&mut self) { + // empty array or the function called without appending any value. + if self.prev_run_end_index == self.current_run_end_index { + return; + } + let run_end_index = R::Native::from_usize(self.current_run_end_index) + .unwrap_or_else(|| panic!( + "Cannot convert the value {} from `usize` to native form of arrow datatype {}", + self.current_run_end_index, + R::DATA_TYPE + )); + self.run_ends_builder.append_value(run_end_index); + if self.has_current_value { + let slice = self.current_value.as_slice(); + let native = unsafe { + // Safety: + // As self.current_value is created from V::Native. The value V::Native can be + // built back from the bytes without validations + V::Native::from_bytes_unchecked(slice) + }; + self.values_builder.append_value(native); + } else { + self.values_builder.append_null(); + } + self.prev_run_end_index = self.current_run_end_index; + } + + // Similar to `append_run_end` but on custom builders. + fn append_run_end_with_builders( + &self, + run_ends_builder: &mut PrimitiveBuilder, + values_builder: &mut GenericByteBuilder, + ) { + let run_end_index = R::Native::from_usize(self.current_run_end_index) + .unwrap_or_else(|| panic!( + "Cannot convert the value {} from `usize` to native form of arrow datatype {}", + self.current_run_end_index, + R::DATA_TYPE + )); + run_ends_builder.append_value(run_end_index); + if self.has_current_value { + let slice = self.current_value.as_slice(); + let native = unsafe { + // Safety: + // As self.current_value is created from V::Native. The value V::Native can be + // built back from the bytes without validations + V::Native::from_bytes_unchecked(slice) + }; + values_builder.append_value(native); + } else { + values_builder.append_null(); + } + } +} + +impl Extend> for GenericByteRunBuilder +where + R: RunEndIndexType, + V: ByteArrayType, + S: AsRef, +{ + fn extend>>(&mut self, iter: T) { + for elem in iter { + self.append_option(elem); + } + } +} + +/// Array builder for [`RunArray`] that encodes strings ([`Utf8Type`]). +/// +/// ``` +/// // Create a run-end encoded array with run-end indexes data type as `i16`. +/// // The encoded values are Strings. +/// +/// # use arrow_array::builder::StringRunBuilder; +/// # use arrow_array::{Int16Array, StringArray}; +/// # use arrow_array::types::Int16Type; +/// +/// let mut builder = StringRunBuilder::::new(); +/// +/// // The builder builds the dictionary value by value +/// builder.append_value("abc"); +/// builder.append_null(); +/// builder.extend([Some("def"), Some("def"), Some("abc")]); +/// let array = builder.finish(); +/// +/// assert_eq!( +/// array.run_ends(), +/// &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(5)]) +/// ); +/// +/// // Values are polymorphic and so require a downcast. +/// let av = array.values(); +/// let ava: &StringArray = av.as_any().downcast_ref::().unwrap(); +/// +/// assert_eq!(ava.value(0), "abc"); +/// assert!(av.is_null(1)); +/// assert_eq!(ava.value(2), "def"); +/// assert_eq!(ava.value(3), "abc"); +/// +/// ``` +pub type StringRunBuilder = GenericByteRunBuilder; + +/// Array builder for [`RunArray`] that encodes large strings ([`LargeUtf8Type`]). See [`StringRunBuilder`] for an example. +pub type LargeStringRunBuilder = GenericByteRunBuilder; + +/// Array builder for [`RunArray`] that encodes binary values([`BinaryType`]). +/// +/// ``` +/// // Create a run-end encoded array with run-end indexes data type as `i16`. +/// // The encoded data is binary values. +/// +/// # use arrow_array::builder::BinaryRunBuilder; +/// # use arrow_array::{BinaryArray, Int16Array}; +/// # use arrow_array::types::Int16Type; +/// +/// let mut builder = BinaryRunBuilder::::new(); +/// +/// // The builder builds the dictionary value by value +/// builder.append_value(b"abc"); +/// builder.append_null(); +/// builder.extend([Some(b"def"), Some(b"def"), Some(b"abc")]); +/// let array = builder.finish(); +/// +/// assert_eq!( +/// array.run_ends(), +/// &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(5)]) +/// ); +/// +/// // Values are polymorphic and so require a downcast. +/// let av = array.values(); +/// let ava: &BinaryArray = av.as_any().downcast_ref::().unwrap(); +/// +/// assert_eq!(ava.value(0), b"abc"); +/// assert!(av.is_null(1)); +/// assert_eq!(ava.value(2), b"def"); +/// assert_eq!(ava.value(3), b"abc"); +/// +/// ``` +pub type BinaryRunBuilder = GenericByteRunBuilder; + +/// Array builder for [`RunArray`] that encodes large binary values([`LargeBinaryType`]). +/// See documentation of [`BinaryRunBuilder`] for an example. +pub type LargeBinaryRunBuilder = GenericByteRunBuilder; + +#[cfg(test)] +mod tests { + use super::*; + + use crate::array::Array; + use crate::cast::as_primitive_array; + use crate::cast::as_string_array; + use crate::types::{Int16Type, Int32Type}; + use crate::GenericByteArray; + use crate::Int16Array; + use crate::Int16RunArray; + + fn test_bytes_run_buider(values: Vec<&T::Native>) + where + T: ByteArrayType, + ::Native: PartialEq, + ::Native: AsRef<::Native>, + { + let mut builder = GenericByteRunBuilder::::new(); + builder.append_value(values[0]); + builder.append_value(values[0]); + builder.append_value(values[0]); + builder.append_null(); + builder.append_null(); + builder.append_value(values[1]); + builder.append_value(values[1]); + builder.append_value(values[2]); + builder.append_value(values[2]); + builder.append_value(values[2]); + builder.append_value(values[2]); + let array = builder.finish(); + + assert_eq!(array.len(), 11); + assert_eq!(array.null_count(), 0); + + assert_eq!( + array.run_ends(), + &Int16Array::from(vec![Some(3), Some(5), Some(7), Some(11)]) + ); + + // Values are polymorphic and so require a downcast. + let av = array.values(); + let ava: &GenericByteArray = + av.as_any().downcast_ref::>().unwrap(); + + assert_eq!(*ava.value(0), *values[0]); + assert!(ava.is_null(1)); + assert_eq!(*ava.value(2), *values[1]); + assert_eq!(*ava.value(3), *values[2]); + } + + #[test] + fn test_string_run_buider() { + test_bytes_run_buider::(vec!["abc", "def", "ghi"]); + } + + #[test] + fn test_string_run_buider_with_empty_strings() { + test_bytes_run_buider::(vec!["abc", "", "ghi"]); + } + + #[test] + fn test_binary_run_buider() { + test_bytes_run_buider::(vec![b"abc", b"def", b"ghi"]); + } + + fn test_bytes_run_buider_finish_cloned(values: Vec<&T::Native>) + where + T: ByteArrayType, + ::Native: PartialEq, + ::Native: AsRef<::Native>, + { + let mut builder = GenericByteRunBuilder::::new(); + + builder.append_value(values[0]); + builder.append_null(); + + assert_eq!(builder.physical_len(), 2); + + builder.append_value(values[1]); + + assert_eq!(builder.physical_len(), 3); + + builder.append_value(values[1]); + builder.append_value(values[0]); + let mut array: Int16RunArray = builder.finish_cloned(); + + assert_eq!(array.len(), 5); + assert_eq!(array.null_count(), 0); + + assert_eq!( + array.run_ends(), + &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(5)]) + ); + + // Values are polymorphic and so require a downcast. + let av = array.values(); + let ava: &GenericByteArray = + av.as_any().downcast_ref::>().unwrap(); + + assert_eq!(ava.value(0), values[0]); + assert!(ava.is_null(1)); + assert_eq!(ava.value(2), values[1]); + assert_eq!(ava.value(3), values[0]); + + // Append last value before `finish_cloned` (`value[0]`) again and ensure it has only + // one entry in final output. + builder.append_value(values[0]); + + assert_eq!(builder.physical_len(), 4); + + builder.append_value(values[0]); + builder.append_value(values[1]); + assert_eq!(builder.physical_len(), 5); + + array = builder.finish(); + + assert_eq!(array.len(), 8); + assert_eq!(array.null_count(), 0); + + assert_eq!( + array.run_ends(), + &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(7), Some(8),]) + ); + + // Values are polymorphic and so require a downcast. + let av2 = array.values(); + let ava2: &GenericByteArray = + av2.as_any().downcast_ref::>().unwrap(); + + assert_eq!(ava2.value(0), values[0]); + assert!(ava2.is_null(1)); + assert_eq!(ava2.value(2), values[1]); + // The value appended before and after `finish_cloned` has only one entry. + assert_eq!(ava2.value(3), values[0]); + assert_eq!(ava2.value(4), values[1]); + } + + #[test] + fn test_string_run_buider_finish_cloned() { + test_bytes_run_buider_finish_cloned::(vec!["abc", "def", "ghi"]); + } + + #[test] + fn test_binary_run_buider_finish_cloned() { + test_bytes_run_buider_finish_cloned::(vec![b"abc", b"def", b"ghi"]); + } + + #[test] + fn test_extend() { + let mut builder = StringRunBuilder::::new(); + builder.extend(["a", "a", "a", "", "", "b", "b"].into_iter().map(Some)); + builder.extend(["b", "cupcakes", "cupcakes"].into_iter().map(Some)); + let array = builder.finish(); + + assert_eq!(array.len(), 10); + assert_eq!( + as_primitive_array::(array.run_ends()).values(), + &[3, 5, 8, 10] + ); + + let str_array = as_string_array(array.values().as_ref()); + assert_eq!(str_array.value(0), "a"); + assert_eq!(str_array.value(1), ""); + assert_eq!(str_array.value(2), "b"); + assert_eq!(str_array.value(3), "cupcakes"); + } +} diff --git a/arrow-array/src/builder/mod.rs b/arrow-array/src/builder/mod.rs index 820ecd23bc5e..fc2454635d99 100644 --- a/arrow-array/src/builder/mod.rs +++ b/arrow-array/src/builder/mod.rs @@ -39,10 +39,14 @@ mod primitive_builder; pub use primitive_builder::*; mod primitive_dictionary_builder; pub use primitive_dictionary_builder::*; +mod primitive_run_builder; +pub use primitive_run_builder::*; mod struct_builder; pub use struct_builder::*; mod generic_bytes_dictionary_builder; pub use generic_bytes_dictionary_builder::*; +mod generic_byte_run_builder; +pub use generic_byte_run_builder::*; mod union_builder; pub use union_builder::*; diff --git a/arrow-array/src/builder/primitive_run_builder.rs b/arrow-array/src/builder/primitive_run_builder.rs new file mode 100644 index 000000000000..4d5dcea5bd34 --- /dev/null +++ b/arrow-array/src/builder/primitive_run_builder.rs @@ -0,0 +1,342 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{any::Any, sync::Arc}; + +use crate::{types::RunEndIndexType, ArrayRef, ArrowPrimitiveType, RunArray}; + +use super::{ArrayBuilder, PrimitiveBuilder}; + +use arrow_buffer::ArrowNativeType; + +/// Array builder for [`RunArray`] that encodes primitive values. +/// +/// # Example: +/// +/// ``` +/// +/// # use arrow_array::builder::PrimitiveRunBuilder; +/// # use arrow_array::cast::as_primitive_array; +/// # use arrow_array::types::{UInt32Type, Int16Type}; +/// # use arrow_array::{Array, UInt32Array, Int16Array}; +/// +/// let mut builder = +/// PrimitiveRunBuilder::::new(); +/// builder.append_value(1234); +/// builder.append_value(1234); +/// builder.append_value(1234); +/// builder.append_null(); +/// builder.append_value(5678); +/// builder.append_value(5678); +/// let array = builder.finish(); +/// +/// assert_eq!( +/// array.run_ends(), +/// &Int16Array::from(vec![Some(3), Some(4), Some(6)]) +/// ); +/// +/// let av = array.values(); +/// +/// assert!(!av.is_null(0)); +/// assert!(av.is_null(1)); +/// assert!(!av.is_null(2)); +/// +/// // Values are polymorphic and so require a downcast. +/// let ava: &UInt32Array = as_primitive_array::(av.as_ref()); +/// +/// assert_eq!(ava, &UInt32Array::from(vec![Some(1234), None, Some(5678)])); +/// ``` +#[derive(Debug)] +pub struct PrimitiveRunBuilder +where + R: RunEndIndexType, + V: ArrowPrimitiveType, +{ + run_ends_builder: PrimitiveBuilder, + values_builder: PrimitiveBuilder, + current_value: Option, + current_run_end_index: usize, + prev_run_end_index: usize, +} + +impl Default for PrimitiveRunBuilder +where + R: RunEndIndexType, + V: ArrowPrimitiveType, +{ + fn default() -> Self { + Self::new() + } +} + +impl PrimitiveRunBuilder +where + R: RunEndIndexType, + V: ArrowPrimitiveType, +{ + /// Creates a new `PrimitiveRunBuilder` + pub fn new() -> Self { + Self { + run_ends_builder: PrimitiveBuilder::new(), + values_builder: PrimitiveBuilder::new(), + current_value: None, + current_run_end_index: 0, + prev_run_end_index: 0, + } + } + + /// Creates a new `PrimitiveRunBuilder` with the provided capacity + /// + /// `capacity`: the expected number of run-end encoded values. + pub fn with_capacity(capacity: usize) -> Self { + Self { + run_ends_builder: PrimitiveBuilder::with_capacity(capacity), + values_builder: PrimitiveBuilder::with_capacity(capacity), + current_value: None, + current_run_end_index: 0, + prev_run_end_index: 0, + } + } + + /// Returns the physical length of the encoded array + pub fn physical_len(&self) -> usize { + let mut len = self.run_ends_builder.len(); + + // If there is an ongoing run yet to be added, include it in the len + if self.prev_run_end_index != self.current_run_end_index { + len += 1; + } + len + } +} + +impl ArrayBuilder for PrimitiveRunBuilder +where + R: RunEndIndexType, + V: ArrowPrimitiveType, +{ + /// Returns the builder as a non-mutable `Any` reference. + fn as_any(&self) -> &dyn Any { + self + } + + /// Returns the builder as a mutable `Any` reference. + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + + /// Returns the boxed builder as a box of `Any`. + fn into_box_any(self: Box) -> Box { + self + } + + /// Returns the length of logical array encoded by + /// the eventual runs array. + fn len(&self) -> usize { + self.current_run_end_index + } + + /// Returns whether the number of array slots is zero + fn is_empty(&self) -> bool { + self.current_run_end_index == 0 + } + + /// Builds the array and reset this builder. + fn finish(&mut self) -> ArrayRef { + Arc::new(self.finish()) + } + + /// Builds the array without resetting the builder. + fn finish_cloned(&self) -> ArrayRef { + Arc::new(self.finish_cloned()) + } +} + +impl PrimitiveRunBuilder +where + R: RunEndIndexType, + V: ArrowPrimitiveType, +{ + /// Appends optional value to the logical array encoded by the RunArray. + pub fn append_option(&mut self, value: Option) { + if self.current_run_end_index == 0 { + self.current_run_end_index = 1; + self.current_value = value; + return; + } + if self.current_value != value { + self.append_run_end(); + self.current_value = value; + } + + self.current_run_end_index += 1; + } + + /// Appends value to the logical array encoded by the run-ends array. + pub fn append_value(&mut self, value: V::Native) { + self.append_option(Some(value)) + } + + /// Appends null to the logical array encoded by the run-ends array. + pub fn append_null(&mut self) { + self.append_option(None) + } + + /// Creates the RunArray and resets the builder. + /// Panics if RunArray cannot be built. + pub fn finish(&mut self) -> RunArray { + // write the last run end to the array. + self.append_run_end(); + + // reset the run index to zero. + self.current_value = None; + self.current_run_end_index = 0; + + // build the run encoded array by adding run_ends and values array as its children. + let run_ends_array = self.run_ends_builder.finish(); + let values_array = self.values_builder.finish(); + RunArray::::try_new(&run_ends_array, &values_array).unwrap() + } + + /// Creates the RunArray and without resetting the builder. + /// Panics if RunArray cannot be built. + pub fn finish_cloned(&self) -> RunArray { + let mut run_ends_array = self.run_ends_builder.finish_cloned(); + let mut values_array = self.values_builder.finish_cloned(); + + // Add current run if one exists + if self.prev_run_end_index != self.current_run_end_index { + let mut run_end_builder = run_ends_array.into_builder().unwrap(); + let mut values_builder = values_array.into_builder().unwrap(); + self.append_run_end_with_builders(&mut run_end_builder, &mut values_builder); + run_ends_array = run_end_builder.finish(); + values_array = values_builder.finish(); + } + + RunArray::try_new(&run_ends_array, &values_array).unwrap() + } + + // Appends the current run to the array. There are scenarios where this function can be called + // multiple times before getting a new value. e.g. appending different value immediately following + // finish_cloned. + fn append_run_end(&mut self) { + // empty array or the function called without appending any value. + if self.prev_run_end_index == self.current_run_end_index { + return; + } + let run_end_index = self.run_end_index_as_native(); + self.run_ends_builder.append_value(run_end_index); + self.values_builder.append_option(self.current_value); + self.prev_run_end_index = self.current_run_end_index; + } + + // Similar to `append_run_end` but on custom builders. + fn append_run_end_with_builders( + &self, + run_ends_builder: &mut PrimitiveBuilder, + values_builder: &mut PrimitiveBuilder, + ) { + let run_end_index = self.run_end_index_as_native(); + run_ends_builder.append_value(run_end_index); + values_builder.append_option(self.current_value); + } + + fn run_end_index_as_native(&self) -> R::Native { + R::Native::from_usize(self.current_run_end_index) + .unwrap_or_else(|| panic!( + "Cannot convert the value {} from `usize` to native form of arrow datatype {}", + self.current_run_end_index, + R::DATA_TYPE + )) + } +} + +impl Extend> for PrimitiveRunBuilder +where + R: RunEndIndexType, + V: ArrowPrimitiveType, +{ + fn extend>>(&mut self, iter: T) { + for elem in iter { + self.append_option(elem); + } + } +} + +#[cfg(test)] +mod tests { + use crate::builder::PrimitiveRunBuilder; + use crate::cast::as_primitive_array; + use crate::types::{Int16Type, UInt32Type}; + use crate::{Array, Int16Array, UInt32Array}; + #[test] + fn test_primitive_ree_array_builder() { + let mut builder = PrimitiveRunBuilder::::new(); + builder.append_value(1234); + builder.append_value(1234); + builder.append_value(1234); + + assert_eq!(builder.physical_len(), 1); + + builder.append_null(); + assert_eq!(builder.physical_len(), 2); + + builder.append_value(5678); + builder.append_value(5678); + assert_eq!(builder.physical_len(), 3); + + let array = builder.finish(); + + assert_eq!(array.null_count(), 0); + assert_eq!(array.len(), 6); + + assert_eq!( + array.run_ends(), + &Int16Array::from(vec![Some(3), Some(4), Some(6)]) + ); + + let av = array.values(); + + assert!(!av.is_null(0)); + assert!(av.is_null(1)); + assert!(!av.is_null(2)); + + // Values are polymorphic and so require a downcast. + let ava: &UInt32Array = as_primitive_array::(av.as_ref()); + + assert_eq!(ava, &UInt32Array::from(vec![Some(1234), None, Some(5678)])); + } + + #[test] + fn test_extend() { + let mut builder = PrimitiveRunBuilder::::new(); + builder.extend([1, 2, 2, 5, 5, 4, 4].into_iter().map(Some)); + builder.extend([4, 4, 6, 2].into_iter().map(Some)); + let array = builder.finish(); + + assert_eq!(array.len(), 11); + assert_eq!(array.null_count(), 0); + assert_eq!( + as_primitive_array::(array.run_ends()).values(), + &[1, 3, 5, 9, 10, 11] + ); + assert_eq!( + as_primitive_array::(array.values().as_ref()).values(), + &[1, 2, 5, 4, 6, 2] + ); + } +} diff --git a/arrow-array/src/lib.rs b/arrow-array/src/lib.rs index d6a9ab30b85b..d8dc6efe25be 100644 --- a/arrow-array/src/lib.rs +++ b/arrow-array/src/lib.rs @@ -178,6 +178,7 @@ pub mod cast; mod delta; pub mod iterator; mod raw_pointer; +pub mod run_iterator; pub mod temporal_conversions; pub mod timezone; mod trusted_len; diff --git a/arrow-array/src/run_iterator.rs b/arrow-array/src/run_iterator.rs new file mode 100644 index 000000000000..aa5997ead080 --- /dev/null +++ b/arrow-array/src/run_iterator.rs @@ -0,0 +1,238 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Idiomatic iterator for [`RunArray`](crate::Array) + +use crate::{array::ArrayAccessor, RunArrayAccessor}; + +/// An iterator that returns Some(T) or None, that can be used on any [`ArrayAccessor`] +/// +/// # Performance +/// +/// [`RunArrayIter`] provides an idiomatic way to iterate over an array, however, this +/// comes at the cost of performance. In particular the interleaved handling of +/// the null mask is often sub-optimal. +/// +/// If performing an infallible operation, it is typically faster to perform the operation +/// on every index of the array, and handle the null mask separately. For [`PrimitiveArray`] +/// this functionality is provided by [`compute::unary`] +/// +/// If performing a fallible operation, it isn't possible to perform the operation independently +/// of the null mask, as this might result in a spurious failure on a null index. However, +/// there are more efficient ways to iterate over just the non-null indices, this functionality +/// is provided by [`compute::try_unary`] +/// +/// [`PrimitiveArray`]: crate::PrimitiveArray +/// [`compute::unary`]: https://docs.rs/arrow/latest/arrow/compute/fn.unary.html +/// [`compute::try_unary`]: https://docs.rs/arrow/latest/arrow/compute/fn.try_unary.html +#[derive(Debug)] +pub struct RunArrayIter { + array: T, + current_logical: usize, + current_physical: usize, + current_end_logical: usize, + current_end_physical: usize, +} + +impl RunArrayIter { + /// create a new iterator + pub fn new(array: T) -> Self { + let logical_len = array.len(); + let physical_len: usize = array.physical_len(); + RunArrayIter { + array, + current_logical: 0, + current_physical: 0, + current_end_logical: logical_len, + current_end_physical: physical_len, + } + } +} + +impl Iterator for RunArrayIter { + type Item = Option; + + #[inline] + fn next(&mut self) -> Option { + if self.current_logical == self.current_end_logical { + return None; + } + // If current logical index is greater than current run end index then increment + // the physical index. + match self.array.run_end_index(self.current_physical) { + None => { + // The self.current_physical shold not go out of bounds as its + // kept within the bounds of self.current_logical. + panic!( + "Could not get run end index for physical index {}", + self.current_physical + ); + } + Some(run_end_index) if self.current_logical >= run_end_index => { + //As the run_ends is expected to be strictly increasing, there + // should be at least one logical entry in one physical entry. Because of this + // reason we dont have to increment the physical index multiple times to get to next + // logical index. + self.current_physical += 1; + } + _ => {} + } + if self.array.is_value_null(self.current_physical) { + self.current_logical += 1; + Some(None) + } else { + self.current_logical += 1; + // Safety: + // The self.current_physical is kept within bounds of self.current_logical. + // The self.current_logical will not go out of bounds because of the check + // `self.current_logical = self.current_end_logical` above. + unsafe { Some(Some(self.array.value_unchecked(self.current_physical))) } + } + } + + fn size_hint(&self) -> (usize, Option) { + ( + self.current_end_logical - self.current_logical, + Some(self.current_end_logical - self.current_logical), + ) + } +} + +impl DoubleEndedIterator for RunArrayIter { + fn next_back(&mut self) -> Option { + if self.current_end_logical == self.current_logical { + None + } else { + self.current_end_logical -= 1; + if self.current_end_physical > 0 + && self.current_end_logical + < self + .array + .run_end_index(self.current_end_physical - 1) + .unwrap() + { + self.current_end_physical -= 1; + } + Some(if self.array.is_value_null(self.current_end_physical) { + None + } else { + // Safety: + // The check `self.current_end_physical > 0` ensures we don't underflow + // the variable. Also self.current_end_physical starts with array.len() + // and decrements based on the bounds of self.current_end_logical. + unsafe { Some(self.array.value_unchecked(self.current_end_physical)) } + }) + } + } +} + +/// all arrays have known size. +impl ExactSizeIterator for RunArrayIter {} + +#[cfg(test)] +mod tests { + use crate::{ + array::{Int32Array, StringArray}, + builder::PrimitiveRunBuilder, + types::Int32Type, + Int64RunArray, + }; + + #[test] + fn test_primitive_array_iter_round_trip() { + let mut input_vec = vec![ + Some(32), + Some(32), + None, + Some(64), + Some(64), + Some(64), + Some(72), + ]; + let mut builder = PrimitiveRunBuilder::::new(); + builder.extend(input_vec.clone().into_iter()); + let ree_array = builder.finish(); + let ree_array = ree_array.downcast_ref::().unwrap(); + + let output_vec: Vec> = ree_array.into_iter().collect(); + assert_eq!(input_vec, output_vec); + + let rev_output_vec: Vec> = ree_array.into_iter().rev().collect(); + input_vec.reverse(); + assert_eq!(input_vec, rev_output_vec); + } + + #[test] + fn test_double_ended() { + let input_vec = vec![ + Some(32), + Some(32), + None, + Some(64), + Some(64), + Some(64), + Some(72), + ]; + let mut builder = PrimitiveRunBuilder::::new(); + builder.extend(input_vec.into_iter()); + let ree_array = builder.finish(); + let ree_array = ree_array.downcast_ref::().unwrap(); + + let mut iter = ree_array.into_iter(); + assert_eq!(Some(Some(32)), iter.next()); + assert_eq!(Some(Some(72)), iter.next_back()); + assert_eq!(Some(Some(32)), iter.next()); + assert_eq!(Some(Some(64)), iter.next_back()); + assert_eq!(Some(None), iter.next()); + assert_eq!(Some(Some(64)), iter.next_back()); + assert_eq!(Some(Some(64)), iter.next()); + assert_eq!(None, iter.next_back()); + assert_eq!(None, iter.next()); + } + + #[test] + fn test_string_array_iter_round_trip() { + let input_vec = vec!["ab", "ab", "ba", "cc", "cc"]; + let input_ree_array: Int64RunArray = input_vec.into_iter().collect(); + let string_ree_array = input_ree_array.downcast_ref::().unwrap(); + + // to and from iter, with a +1 + let result: Vec> = string_ree_array + .into_iter() + .map(|e| { + e.map(|e| { + let mut a = e.to_string(); + a.push('b'); + a + }) + }) + .collect(); + + let result_asref: Vec> = + result.iter().map(|f| f.as_deref()).collect(); + + let expected_vec = vec![ + Some("abb"), + Some("abb"), + Some("bab"), + Some("ccb"), + Some("ccb"), + ]; + + assert_eq!(expected_vec, result_asref); + } +} diff --git a/arrow-array/src/types.rs b/arrow-array/src/types.rs index 7c41a469e30e..fc02c0e5a3dc 100644 --- a/arrow-array/src/types.rs +++ b/arrow-array/src/types.rs @@ -240,6 +240,31 @@ impl ArrowDictionaryKeyType for UInt32Type {} impl ArrowDictionaryKeyType for UInt64Type {} +mod run { + use super::*; + + pub trait RunEndTypeSealed {} + + impl RunEndTypeSealed for Int16Type {} + + impl RunEndTypeSealed for Int32Type {} + + impl RunEndTypeSealed for Int64Type {} +} + +/// A subtype of primitive type that is used as run-ends index +/// in `RunArray`. +/// See +/// +/// Note: The implementation of this trait is sealed to avoid accidental misuse. +pub trait RunEndIndexType: ArrowPrimitiveType + run::RunEndTypeSealed {} + +impl RunEndIndexType for Int16Type {} + +impl RunEndIndexType for Int32Type {} + +impl RunEndIndexType for Int64Type {} + /// A subtype of primitive type that represents temporal values. pub trait ArrowTemporalType: ArrowPrimitiveType {} diff --git a/arrow-data/src/data.rs b/arrow-data/src/data.rs index 258ee082da1b..07bbc664234a 100644 --- a/arrow-data/src/data.rs +++ b/arrow-data/src/data.rs @@ -198,9 +198,9 @@ pub(crate) fn new_buffers(data_type: &DataType, capacity: usize) -> [MutableBuff ], _ => unreachable!(), }, - DataType::FixedSizeList(_, _) | DataType::Struct(_) => { - [empty_buffer, MutableBuffer::new(0)] - } + DataType::FixedSizeList(_, _) + | DataType::Struct(_) + | DataType::RunEndEncoded(_, _) => [empty_buffer, MutableBuffer::new(0)], DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => [ MutableBuffer::new(capacity * mem::size_of::()), empty_buffer, @@ -724,6 +724,12 @@ impl ArrayData { DataType::Dictionary(_, data_type) => { vec![Self::new_empty(data_type)] } + DataType::RunEndEncoded(run_ends, values) => { + vec![ + Self::new_empty(run_ends.data_type()), + Self::new_empty(values.data_type()), + ] + } }; // Data was constructed correctly above @@ -853,6 +859,19 @@ impl ArrayData { ))); } } + DataType::RunEndEncoded(run_ends_type, _) => { + if run_ends_type.is_nullable() { + return Err(ArrowError::InvalidArgumentError( + "The nullable should be set to false for the field defining run_ends array.".to_string() + )); + } + if !DataType::is_run_ends_type(run_ends_type.data_type()) { + return Err(ArrowError::InvalidArgumentError(format!( + "RunArray run_ends types must be Int16, Int32 or Int64, but was {}", + run_ends_type.data_type() + ))); + } + } _ => {} }; @@ -998,6 +1017,25 @@ impl ArrayData { } Ok(()) } + DataType::RunEndEncoded(run_ends_field, values_field) => { + self.validate_num_child_data(2)?; + let run_ends_data = + self.get_valid_child_data(0, run_ends_field.data_type())?; + let values_data = + self.get_valid_child_data(1, values_field.data_type())?; + if run_ends_data.len != values_data.len { + return Err(ArrowError::InvalidArgumentError(format!( + "The run_ends array length should be the same as values array length. Run_ends array length is {}, values array length is {}", + run_ends_data.len, values_data.len + ))); + } + if run_ends_data.null_count() > 0 { + return Err(ArrowError::InvalidArgumentError( + "Found null values in run_ends array. The run_ends array should not have null values.".to_string(), + )); + } + Ok(()) + } DataType::Union(fields, _, mode) => { self.validate_num_child_data(fields.len())?; @@ -1286,6 +1324,15 @@ impl ArrayData { _ => unreachable!(), } } + DataType::RunEndEncoded(run_ends, _values) => { + let run_ends_data = self.child_data()[0].clone(); + match run_ends.data_type() { + DataType::Int16 => run_ends_data.check_run_ends::(self.len()), + DataType::Int32 => run_ends_data.check_run_ends::(self.len()), + DataType::Int64 => run_ends_data.check_run_ends::(self.len()), + _ => unreachable!(), + } + } _ => { // No extra validation check required for other types Ok(()) @@ -1446,6 +1493,50 @@ impl ArrayData { }) } + /// Validates that each value in run_ends array is positive and strictly increasing. + fn check_run_ends(&self, array_len: usize) -> Result<(), ArrowError> + where + T: ArrowNativeType + TryInto + num::Num + std::fmt::Display, + { + let values = self.typed_buffer::(0, self.len())?; + let mut prev_value: i64 = 0_i64; + values.iter().enumerate().try_for_each(|(ix, &inp_value)| { + let value: i64 = inp_value.try_into().map_err(|_| { + ArrowError::InvalidArgumentError(format!( + "Value at position {} out of bounds: {} (can not convert to i64)", + ix, inp_value + )) + })?; + if value <= 0_i64 { + return Err(ArrowError::InvalidArgumentError(format!( + "The values in run_ends array should be strictly positive. Found value {} at index {} that does not match the criteria.", + value, + ix + ))); + } + if ix > 0 && value <= prev_value { + return Err(ArrowError::InvalidArgumentError(format!( + "The values in run_ends array should be strictly increasing. Found value {} at index {} with previous value {} that does not match the criteria.", + value, + ix, + prev_value + ))); + } + + prev_value = value; + Ok(()) + })?; + + if prev_value.as_usize() != array_len { + return Err(ArrowError::InvalidArgumentError(format!( + "The length of array does not match the last value in the run_ends array. The last value of run_ends array is {} and length of array is {}.", + prev_value, + array_len + ))); + } + Ok(()) + } + /// Returns true if this `ArrayData` is equal to `other`, using pointer comparisons /// to determine buffer equality. This is cheaper than `PartialEq::eq` but may /// return false when the arrays are logically equal @@ -1542,6 +1633,7 @@ pub fn layout(data_type: &DataType) -> DataTypeLayout { DataType::FixedSizeList(_, _) => DataTypeLayout::new_empty(), // all in child data DataType::LargeList(_) => DataTypeLayout::new_fixed_width(size_of::()), DataType::Struct(_) => DataTypeLayout::new_empty(), // all in child data, + DataType::RunEndEncoded(_, _) => DataTypeLayout::new_empty(), // all in child data, DataType::Union(_, _, mode) => { let type_ids = BufferSpec::FixedWidth { byte_width: size_of::(), diff --git a/arrow-data/src/equal/mod.rs b/arrow-data/src/equal/mod.rs index 85c595cfed1c..aff61e3d37e5 100644 --- a/arrow-data/src/equal/mod.rs +++ b/arrow-data/src/equal/mod.rs @@ -137,6 +137,7 @@ fn equal_values( }, DataType::Float16 => primitive_equal::(lhs, rhs, lhs_start, rhs_start, len), DataType::Map(_, _) => list_equal::(lhs, rhs, lhs_start, rhs_start, len), + DataType::RunEndEncoded(_, _) => todo!(), } } diff --git a/arrow-data/src/transform/mod.rs b/arrow-data/src/transform/mod.rs index 6a8c89d25a22..2a24b1cc2662 100644 --- a/arrow-data/src/transform/mod.rs +++ b/arrow-data/src/transform/mod.rs @@ -230,6 +230,7 @@ fn build_extend(array: &ArrayData) -> Extend { UnionMode::Sparse => union::build_extend_sparse(array), UnionMode::Dense => union::build_extend_dense(array), }, + DataType::RunEndEncoded(_, _) => todo!(), } } @@ -281,6 +282,7 @@ fn build_extend_nulls(data_type: &DataType) -> ExtendNulls { UnionMode::Sparse => union::extend_nulls_sparse, UnionMode::Dense => union::extend_nulls_dense, }, + DataType::RunEndEncoded(_, _) => todo!(), }) } @@ -473,6 +475,20 @@ impl<'a> MutableArrayData<'a> { }) .collect::>(), }, + DataType::RunEndEncoded(_, _) => { + let run_ends_child = arrays + .iter() + .map(|array| &array.child_data()[0]) + .collect::>(); + let value_child = arrays + .iter() + .map(|array| &array.child_data()[1]) + .collect::>(); + vec![ + MutableArrayData::new(run_ends_child, false, array_capacity), + MutableArrayData::new(value_child, use_nulls, array_capacity), + ] + } DataType::FixedSizeList(_, _) => { let childs = arrays .iter() diff --git a/arrow-integration-test/src/datatype.rs b/arrow-integration-test/src/datatype.rs index dd0b95b0a836..c2e326b4f2f3 100644 --- a/arrow-integration-test/src/datatype.rs +++ b/arrow-integration-test/src/datatype.rs @@ -357,6 +357,7 @@ pub fn data_type_to_json(data_type: &DataType) -> serde_json::Value { DataType::Map(_, keys_sorted) => { json!({"name": "map", "keysSorted": keys_sorted}) } + DataType::RunEndEncoded(_, _) => todo!(), } } diff --git a/arrow-ipc/src/convert.rs b/arrow-ipc/src/convert.rs index a60a19b866cb..305bb943cbbf 100644 --- a/arrow-ipc/src/convert.rs +++ b/arrow-ipc/src/convert.rs @@ -711,6 +711,7 @@ pub(crate) fn get_fb_field_type<'a>( children: Some(fbb.create_vector(&children[..])), } } + RunEndEncoded(_, _) => todo!(), Map(map_field, keys_sorted) => { let child = build_field(fbb, map_field); let mut field_type = crate::MapBuilder::new(fbb); diff --git a/arrow-schema/src/datatype.rs b/arrow-schema/src/datatype.rs index da1c20ddbd38..1e5c1321c952 100644 --- a/arrow-schema/src/datatype.rs +++ b/arrow-schema/src/datatype.rs @@ -242,6 +242,18 @@ pub enum DataType { /// child fields may be respectively "entries", "key", and "value", but this is /// not enforced. Map(Box, bool), + /// A run-end encoding (REE) is a variation of run-length encoding (RLE). These + /// encodings are well-suited for representing data containing sequences of the + /// same value, called runs. Each run is represented as a value and an integer giving + /// the index in the array where the run ends. + /// + /// A run-end encoded array has no buffers by itself, but has two child arrays. The + /// first child array, called the run ends array, holds either 16, 32, or 64-bit + /// signed integers. The actual values of each run are held in the second child array. + /// + /// These child arrays are prescribed the standard names of "run_ends" and "values" + /// respectively. + RunEndEncoded(Box, Box), } /// An absolute length of time in seconds, milliseconds, microseconds or nanoseconds. @@ -346,6 +358,13 @@ impl DataType { ) } + /// Returns true if this type is valid for run-ends array in RunArray + #[inline] + pub fn is_run_ends_type(&self) -> bool { + use DataType::*; + matches!(self, Int16 | Int32 | Int64) + } + /// Returns true if this type is nested (List, FixedSizeList, LargeList, Struct, Union, /// or Map), or a dictionary of a nested type pub fn is_nested(&self) -> bool { @@ -438,6 +457,10 @@ impl DataType { + (std::mem::size_of::() * fields.capacity()) } DataType::Dictionary(dt1, dt2) => dt1.size() + dt2.size(), + DataType::RunEndEncoded(run_ends, values) => { + run_ends.size() - std::mem::size_of_val(run_ends) + values.size() + - std::mem::size_of_val(values) + } } } } diff --git a/arrow-schema/src/error.rs b/arrow-schema/src/error.rs index ea60572b3d4d..6213af8bcf10 100644 --- a/arrow-schema/src/error.rs +++ b/arrow-schema/src/error.rs @@ -41,6 +41,7 @@ pub enum ArrowError { /// Error during import or export to/from the C Data Interface CDataInterface(String), DictionaryKeyOverflowError, + RunEndIndexOverflowError, } impl ArrowError { @@ -96,6 +97,9 @@ impl Display for ArrowError { ArrowError::DictionaryKeyOverflowError => { write!(f, "Dictionary key bigger than the key type") } + ArrowError::RunEndIndexOverflowError => { + write!(f, "Run end encoded array index overflow error") + } } } } diff --git a/arrow-schema/src/field.rs b/arrow-schema/src/field.rs index a3275dcb3355..dc3ab3d6237f 100644 --- a/arrow-schema/src/field.rs +++ b/arrow-schema/src/field.rs @@ -410,6 +410,7 @@ impl Field { | DataType::List(_) | DataType::Map(_, _) | DataType::Dictionary(_, _) + | DataType::RunEndEncoded(_, _) | DataType::FixedSizeList(_, _) | DataType::FixedSizeBinary(_) | DataType::Utf8 diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index ee926ee52868..decfeb949a08 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -237,6 +237,10 @@ required-features = ["test_utils"] name = "string_dictionary_builder" harness = false +[[bench]] +name = "string_run_builder" +harness = false + [[bench]] name = "substring_kernels" harness = false diff --git a/arrow/benches/string_run_builder.rs b/arrow/benches/string_run_builder.rs new file mode 100644 index 000000000000..608de4397d92 --- /dev/null +++ b/arrow/benches/string_run_builder.rs @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::StringRunBuilder; +use arrow::datatypes::Int32Type; +use criterion::{criterion_group, criterion_main, Criterion}; +use rand::{thread_rng, Rng}; + +fn build_strings( + physical_array_len: usize, + logical_array_len: usize, + string_len: usize, +) -> Vec { + let mut rng = thread_rng(); + let run_len = logical_array_len / physical_array_len; + let mut values: Vec = (0..physical_array_len) + .map(|_| (0..string_len).map(|_| rng.gen::()).collect()) + .flat_map(|s| std::iter::repeat(s).take(run_len)) + .collect(); + while values.len() < logical_array_len { + let last_val = values[values.len() - 1].clone(); + values.push(last_val); + } + values +} + +fn criterion_benchmark(c: &mut Criterion) { + let mut group = c.benchmark_group("string_run_builder"); + + let mut do_bench = + |physical_array_len: usize, logical_array_len: usize, string_len: usize| { + group.bench_function( + format!( + "(run_array_len:{}, physical_array_len:{}, string_len: {})", + logical_array_len, physical_array_len, string_len + ), + |b| { + let strings = + build_strings(physical_array_len, logical_array_len, string_len); + b.iter(|| { + let mut builder = StringRunBuilder::::with_capacity( + physical_array_len, + (string_len + 1) * physical_array_len, + ); + + for val in &strings { + builder.append_value(val); + } + + builder.finish(); + }) + }, + ); + }; + + do_bench(20, 1000, 5); + do_bench(100, 1000, 5); + do_bench(100, 1000, 10); + do_bench(100, 10000, 10); + do_bench(100, 10000, 100); + + group.finish(); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 311981593718..c459d40d73b9 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -360,7 +360,7 @@ fn write_leaves( ArrowDataType::Float16 => Err(ParquetError::ArrowError( "Float16 arrays not supported".to_string(), )), - ArrowDataType::FixedSizeList(_, _) | ArrowDataType::Union(_, _, _) => { + ArrowDataType::FixedSizeList(_, _) | ArrowDataType::Union(_, _, _) | ArrowDataType::RunEndEncoded(_, _) => { Err(ParquetError::NYI( format!( "Attempting to write an Arrow type {:?} to parquet that is not yet implemented", diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 2ca4b7ef8a79..d81d6a69bbb9 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -507,6 +507,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { let dict_field = Field::new(name, *value.clone(), field.is_nullable()); arrow_to_parquet_type(&dict_field) } + DataType::RunEndEncoded(_, _) => Err(arrow_err!("Converting RunEndEncodedType to parquet not supported",)) } }