Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support for nested struct.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Oct 29, 2021
1 parent 19c67c3 commit 2cd20fe
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 50 deletions.
36 changes: 26 additions & 10 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,18 +191,34 @@ def case_struct(size):
("f1", pa.utf8()),
("f2", pa.bool_()),
]
fields = [
pa.field(
"struct",
pa.struct(struct_fields),
)
]
schema = pa.schema(fields)
schema = pa.schema(
[
pa.field(
"struct",
pa.struct(struct_fields),
),
pa.field(
"struct_struct",
pa.struct(
[
("f1", pa.struct(struct_fields)),
("f2", pa.bool_()),
]
),
),
]
)

struct = pa.StructArray.from_arrays(
[pa.array(string * size), pa.array(boolean * size)],
fields=struct_fields,
)
return (
{
"struct": pa.StructArray.from_arrays(
[pa.array(string * size), pa.array(boolean * size)],
fields=struct_fields,
"struct": struct,
"struct_struct": pa.StructArray.from_arrays(
[struct, pa.array(boolean * size)],
names=["f1", "f2"],
),
},
schema,
Expand Down
84 changes: 61 additions & 23 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#![allow(clippy::type_complexity)]

use std::{
collections::VecDeque,
convert::TryInto,
io::{Read, Seek},
sync::Arc,
Expand Down Expand Up @@ -171,15 +172,43 @@ fn dict_read<
}
}

fn column_offset(data_type: &DataType) -> usize {
use crate::datatypes::PhysicalType::*;
match data_type.to_physical_type() {
Null | Boolean | Primitive(_) | FixedSizeBinary | Binary | LargeBinary | Utf8
| LargeUtf8 | Dictionary(_) | List | LargeList | FixedSizeList => 0,
Struct => {
if let DataType::Struct(v) = data_type.to_logical_type() {
v.iter().map(|x| 1 + column_offset(x.data_type())).sum()
} else {
unreachable!()
}
}
Union => todo!(),
Map => todo!(),
}
}

fn column_datatype(data_type: &DataType, column: usize) -> DataType {
use crate::datatypes::PhysicalType::*;
match data_type.to_physical_type() {
Null | Boolean | Primitive(_) | FixedSizeBinary | Binary | LargeBinary | Utf8
| LargeUtf8 | Dictionary(_) | List | LargeList | FixedSizeList => data_type.clone(),
Struct => {
// todo: this won't work for nested structs because we need to flatten the column ids
if let DataType::Struct(v) = data_type {
v[column].data_type().clone()
if let DataType::Struct(fields) = data_type.to_logical_type() {
let mut total_columns = 0;
let mut total_fields = 0;
for f in fields {
let field_columns = column_offset(f.data_type());
if column < total_columns + field_columns {
return column_datatype(f.data_type(), column + total_columns);
}
total_fields += (field_columns > 0) as usize;
total_columns += field_columns;
}
fields[column + total_fields - total_columns]
.data_type()
.clone()
} else {
unreachable!()
}
Expand Down Expand Up @@ -311,6 +340,30 @@ fn page_iter_to_array<I: FallibleStreamingIterator<Item = DataPage, Error = Parq
}
}

fn finish_array(data_type: DataType, arrays: &mut VecDeque<Box<dyn Array>>) -> Box<dyn Array> {
use crate::datatypes::PhysicalType::*;
match data_type.to_physical_type() {
Null | Boolean | Primitive(_) | FixedSizeBinary | Binary | LargeBinary | Utf8
| LargeUtf8 | List | LargeList | FixedSizeList | Dictionary(_) => {
arrays.pop_front().unwrap()
}
Struct => {
if let DataType::Struct(fields) = data_type.to_logical_type() {
let values = fields
.iter()
.map(|f| finish_array(f.data_type().clone(), arrays))
.map(|x| x.into())
.collect();
Box::new(StructArray::from_data(data_type, values, None))
} else {
unreachable!()
}
}
Union => todo!(),
Map => todo!(),
}
}

/// Returns an [`Array`] built from an iterator of column chunks. It also returns
/// the two buffers used to decompress and deserialize pages (to be re-used).
#[allow(clippy::type_complexity)]
Expand All @@ -326,7 +379,7 @@ where
let mut nested_info = vec![];
init_nested(columns.field(), 0, &mut nested_info);

let mut arrays = vec![];
let mut arrays = VecDeque::new();
let page_buffer;
let mut column = 0;
loop {
Expand All @@ -339,7 +392,7 @@ where
let array =
page_iter_to_array(&mut iterator, &mut nested_info, metadata, data_type)?;
buffer = iterator.into_inner();
arrays.push(array)
arrays.push_back(array)
}
column += 1;
columns = new_iter;
Expand All @@ -351,24 +404,9 @@ where
}
}

use crate::datatypes::PhysicalType::*;
Ok(match data_type.to_physical_type() {
Null | Boolean | Primitive(_) | FixedSizeBinary | Binary | LargeBinary | Utf8
| LargeUtf8 | List | LargeList | FixedSizeList | Dictionary(_) => {
(arrays.pop().unwrap(), page_buffer, buffer)
}
Struct => (
Box::new(StructArray::from_data(
data_type,
arrays.into_iter().map(|x| x.into()).collect(),
None,
)),
page_buffer,
buffer,
),
Union => todo!(),
Map => todo!(),
})
let array = finish_array(data_type, &mut arrays);
assert!(arrays.is_empty());
Ok((array, page_buffer, buffer))
}

/// Converts an async stream of [`DataPage`] into a single [`Array`].
Expand Down
53 changes: 36 additions & 17 deletions tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,23 @@ pub fn pyarrow_nested_nullable_statistics(column: usize) -> Option<Box<dyn Stati
}

pub fn pyarrow_struct(column: usize) -> Box<dyn Array> {
let boolean = [
Some(true),
None,
Some(false),
Some(false),
None,
Some(true),
None,
None,
Some(true),
Some(true),
];
let boolean = Arc::new(BooleanArray::from(boolean.clone())) as Arc<dyn Array>;
let fields = vec![
Field::new("f1", DataType::Utf8, true),
Field::new("f2", DataType::Boolean, true),
];
match column {
0 => {
let string = [
Expand All @@ -548,32 +565,28 @@ pub fn pyarrow_struct(column: usize) -> Box<dyn Array> {
Some("def"),
Some("aaa"),
];
let boolean = [
Some(true),
None,
Some(false),
Some(false),
None,
Some(true),
None,
None,
Some(true),
Some(true),
];
let values = vec![
Arc::new(Utf8Array::<i32>::from(string)) as Arc<dyn Array>,
Arc::new(BooleanArray::from(boolean)) as Arc<dyn Array>,
];
let fields = vec![
Field::new("f1", DataType::Utf8, true),
Field::new("f2", DataType::Boolean, true),
boolean,
];
Box::new(StructArray::from_data(
DataType::Struct(fields),
values,
None,
))
}
1 => {
let struct_ = pyarrow_struct(0).into();
let values = vec![struct_, boolean];
Box::new(StructArray::from_data(
DataType::Struct(vec![
Field::new("f1", DataType::Struct(fields), true),
Field::new("f2", DataType::Boolean, true),
]),
values,
None,
))
}
_ => todo!(),
}
}
Expand All @@ -586,6 +599,12 @@ pub fn pyarrow_struct_statistics(column: usize) -> Option<Box<dyn Statistics>> {
min_value: Some("".to_string()),
max_value: Some("def".to_string()),
})),
1 => Some(Box::new(BooleanStatistics {
distinct_count: None,
null_count: Some(1),
min_value: Some(false),
max_value: Some(true),
})),
_ => todo!(),
}
}
Expand Down
5 changes: 5 additions & 0 deletions tests/it/io/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,11 @@ fn v1_struct_optional() -> Result<()> {
test_pyarrow_integration(0, 1, "struct", false, false)
}

#[test]
fn v1_struct_struct_optional() -> Result<()> {
test_pyarrow_integration(1, 1, "struct", false, false)
}

#[test]
fn all_types() -> Result<()> {
let path = "testing/parquet-testing/data/alltypes_plain.parquet";
Expand Down

0 comments on commit 2cd20fe

Please sign in to comment.