Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support for MapArray read and write to parquet #1419

Merged
merged 18 commits into from
Feb 28, 2023
4 changes: 3 additions & 1 deletion src/array/fmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ pub fn get_value_display<'a, F: Write + 'a>(
Union => Box::new(move |f, index| {
super::union::fmt::write_value(array.as_any().downcast_ref().unwrap(), index, null, f)
}),
Map => todo!(),
Map => Box::new(move |f, index| {
super::map::fmt::write_value(array.as_any().downcast_ref().unwrap(), index, null, f)
}),
Dictionary(key_type) => match_integer_type!(key_type, |$T| {
Box::new(move |f, index| {
super::dictionary::fmt::write_value::<$T,_>(array.as_any().downcast_ref().unwrap(), index, null, f)
Expand Down
113 changes: 113 additions & 0 deletions src/array/growable/map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use std::sync::Arc;

use crate::{
array::{Array, MapArray},
bitmap::MutableBitmap,
offset::Offsets,
};

use super::{
make_growable,
utils::{build_extend_null_bits, ExtendNullBits},
Growable,
};

fn extend_offset_values(growable: &mut GrowableMap<'_>, index: usize, start: usize, len: usize) {
let array = growable.arrays[index];
let offsets = array.offsets();

growable
.offsets
.try_extend_from_slice(offsets, start, len)
.unwrap();

let end = offsets.buffer()[start + len] as usize;
let start = offsets.buffer()[start] as usize;
let len = end - start;
growable.values.extend(index, start, len);
}

/// Concrete [`Growable`] for the [`MapArray`].
pub struct GrowableMap<'a> {
arrays: Vec<&'a MapArray>,
validity: MutableBitmap,
values: Box<dyn Growable<'a> + 'a>,
offsets: Offsets<i32>,
extend_null_bits: Vec<ExtendNullBits<'a>>,
}

impl<'a> GrowableMap<'a> {
/// Creates a new [`GrowableMap`] bound to `arrays` with a pre-allocated `capacity`.
/// # Panics
/// If `arrays` is empty.
pub fn new(arrays: Vec<&'a MapArray>, mut use_validity: bool, capacity: usize) -> Self {
// if any of the arrays has nulls, insertions from any array requires setting bits
// as there is at least one array with nulls.
if !use_validity & arrays.iter().any(|array| array.null_count() > 0) {
use_validity = true;
};

let extend_null_bits = arrays
.iter()
.map(|array| build_extend_null_bits(*array, use_validity))
.collect();

let inner = arrays
.iter()
.map(|array| array.field().as_ref())
.collect::<Vec<_>>();
let values = make_growable(&inner, use_validity, 0);

Self {
arrays,
offsets: Offsets::with_capacity(capacity),
values,
validity: MutableBitmap::with_capacity(capacity),
extend_null_bits,
}
}

fn to(&mut self) -> MapArray {
let validity = std::mem::take(&mut self.validity);
let offsets = std::mem::take(&mut self.offsets);
let values = self.values.as_box();

MapArray::new(
self.arrays[0].data_type().clone(),
offsets.into(),
values,
validity.into(),
)
}
}

impl<'a> Growable<'a> for GrowableMap<'a> {
fn extend(&mut self, index: usize, start: usize, len: usize) {
(self.extend_null_bits[index])(&mut self.validity, start, len);
extend_offset_values(self, index, start, len);
}

fn extend_validity(&mut self, additional: usize) {
self.offsets.extend_constant(additional);
self.validity.extend_constant(additional, false);
}

#[inline]
fn len(&self) -> usize {
self.offsets.len() - 1
}

fn as_arc(&mut self) -> Arc<dyn Array> {
Arc::new(self.to())
}

fn as_box(&mut self) -> Box<dyn Array> {
Box::new(self.to())
}
}

impl<'a> From<GrowableMap<'a>> for MapArray {
fn from(mut val: GrowableMap<'a>) -> Self {
val.to()
}
}
4 changes: 3 additions & 1 deletion src/array/growable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ mod primitive;
pub use primitive::GrowablePrimitive;
mod list;
pub use list::GrowableList;
mod map;
pub use map::GrowableMap;
mod structure;
pub use structure::GrowableStruct;
mod fixed_size_list;
Expand Down Expand Up @@ -123,7 +125,7 @@ pub fn make_growable<'a>(
.collect::<Vec<_>>();
Box::new(union::GrowableUnion::new(arrays, capacity))
}
Map => todo!(),
Map => dyn_growable!(map::GrowableMap, arrays, use_validity, capacity),
Dictionary(key_type) => {
match_integer_type!(key_type, |$T| {
let arrays = arrays
Expand Down
2 changes: 1 addition & 1 deletion src/array/map/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
use super::{new_empty_array, specification::try_check_offsets_bounds, Array};

mod ffi;
mod fmt;
pub(super) mod fmt;
mod iterator;
pub use iterator::*;

Expand Down
37 changes: 36 additions & 1 deletion src/io/parquet/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use parquet2::read::get_page_iterator as _get_page_iterator;
use parquet2::schema::types::PrimitiveType;

use crate::{
array::{Array, DictionaryKey, FixedSizeListArray, ListArray},
array::{Array, DictionaryKey, FixedSizeListArray, ListArray, MapArray},
datatypes::{DataType, Field, IntervalUnit},
error::Result,
offset::Offsets,
Expand Down Expand Up @@ -87,6 +87,33 @@ pub fn create_list(
}
}

/// Creates a new [`MapArray`].
pub fn create_map(
data_type: DataType,
nested: &mut NestedState,
values: Box<dyn Array>,
) -> Box<dyn Array> {
let (mut offsets, validity) = nested.nested.pop().unwrap().inner();
match data_type.to_logical_type() {
DataType::Map(_, _) => {
offsets.push(values.len() as i64);
let offsets = offsets.iter().map(|x| *x as i32).collect::<Vec<_>>();

let offsets: Offsets<i32> = offsets
.try_into()
.expect("i64 offsets do not fit in i32 offsets");

Box::new(MapArray::new(
data_type,
offsets.into(),
values,
validity.and_then(|x| x.into()),
))
}
_ => unreachable!(),
}
}

fn is_primitive(data_type: &DataType) -> bool {
matches!(
data_type.to_physical_type(),
Expand Down Expand Up @@ -147,6 +174,14 @@ pub fn n_columns(data_type: &DataType) -> usize {
unreachable!()
}
}
Map => {
let a = data_type.to_logical_type();
if let DataType::Map(inner, _) = a {
n_columns(&inner.data_type)
} else {
unreachable!()
}
}
Struct => {
if let DataType::Struct(fields) = data_type.to_logical_type() {
fields.iter().map(|inner| n_columns(&inner.data_type)).sum()
Expand Down
17 changes: 6 additions & 11 deletions src/io/parquet/read/deserialize/nested.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use parquet2::schema::types::PrimitiveType;

use crate::{
array::MapArray,
datatypes::{DataType, Field},
error::{Error, Result},
};
Expand Down Expand Up @@ -283,16 +282,12 @@ where
num_rows,
chunk_size,
)?;
Box::new(iter.map(move |x| {
let (nested, inner) = x?;
let array = MapArray::new(
field.data_type().clone(),
vec![0, inner.len() as i32].try_into().unwrap(),
inner,
None,
);
Ok((nested, array.boxed()))
}))
let iter = iter.map(move |x| {
let (mut nested, array) = x?;
let array = create_map(field.data_type().clone(), &mut nested, array);
Ok((nested, array))
});
Box::new(iter) as _
}
other => {
return Err(Error::nyi(format!(
Expand Down
7 changes: 7 additions & 0 deletions src/io/parquet/read/indexes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,13 @@ fn deserialize(
unreachable!()
}
}
PhysicalType::Map => {
if let DataType::Map(inner, _) = data_type.to_logical_type() {
deserialize(indexes, inner.data_type.clone())
} else {
unreachable!()
}
}
PhysicalType::Struct => {
let children_fields = if let DataType::Struct(children) = data_type.to_logical_type() {
children
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ pub use parquet2::{
use crate::{array::Array, error::Result};

pub use deserialize::{
column_iter_to_arrays, create_list, get_page_iterator, init_nested, n_columns, InitNested,
NestedArrayIter, NestedState, StructIterator,
column_iter_to_arrays, create_list, create_map, get_page_iterator, init_nested, n_columns,
InitNested, NestedArrayIter, NestedState, StructIterator,
};
pub use file::{FileReader, RowGroupReader};
pub use row_group::*;
Expand Down
14 changes: 13 additions & 1 deletion src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,8 +633,20 @@ fn transverse_recursive<T, F: Fn(&DataType) -> T + Clone>(
unreachable!()
}
}
Map => {
if let DataType::Map(field, _) = data_type.to_logical_type() {
if let DataType::Struct(fields) = field.data_type.to_logical_type() {
for field in fields {
transverse_recursive(&field.data_type, map.clone(), encodings)
}
} else {
unreachable!()
}
} else {
unreachable!()
}
}
Union => todo!(),
Map => todo!(),
}
}

Expand Down
Loading