Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed error in indexing.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Oct 24, 2021
1 parent b816c7a commit f20d806
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 11 deletions.
31 changes: 26 additions & 5 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,26 @@ fn dict_read<
}
}

/// Returns an Array built from an iterator of column chunks
fn column_datatype(data_type: &DataType, column: usize) -> DataType {
use crate::datatypes::PhysicalType::*;
match data_type.to_physical_type() {
Null | Boolean | Primitive(_) | FixedSizeBinary | Binary | LargeBinary | Utf8
| LargeUtf8 | Dictionary(_) | List | LargeList | FixedSizeList => data_type.clone(),
Struct => {
// todo: this won't work for nested structs because we need to flatten the column ids
if let DataType::Struct(v) = data_type {
v[column].data_type().clone()
} else {
unreachable!()
}
}
Union => todo!(),
Map => todo!(),
}
}

/// Returns an [`Array`] built from an iterator of column chunks. It also returns
/// the two buffers used to decompress and deserialize pages (to be re-used).
#[allow(clippy::type_complexity)]
pub fn column_iter_to_array<II, I>(
mut columns: I,
Expand All @@ -179,16 +198,19 @@ where
{
let mut arrays = vec![];
let page_buffer;
let mut column = 0;
loop {
match columns.advance()? {
State::Some(mut new_iter) => {
let data_type = column_datatype(&data_type, column);
if let Some((pages, metadata)) = new_iter.get() {
let data_type = schema::to_data_type(metadata.descriptor().type_())?.unwrap();
println!("{:?}", data_type);
let mut iterator = BasicDecompressor::new(pages, buffer);
let array = page_iter_to_array(&mut iterator, metadata, data_type)?;
buffer = iterator.into_inner();
arrays.push(array)
}
column += 1;
columns = new_iter;
}
State::Finished(b) => {
Expand All @@ -200,9 +222,8 @@ where

use crate::datatypes::PhysicalType::*;
Ok(match data_type.to_physical_type() {
Null => todo!(),
Boolean | Primitive(_) | FixedSizeBinary | Binary | LargeBinary | Utf8 | LargeUtf8
| List | LargeList | FixedSizeList | Dictionary(_) => {
Null | Boolean | Primitive(_) | FixedSizeBinary | Binary | LargeBinary | Utf8
| LargeUtf8 | List | LargeList | FixedSizeList | Dictionary(_) => {
(arrays.pop().unwrap(), page_buffer, buffer)
}
Struct => todo!(),
Expand Down
22 changes: 17 additions & 5 deletions src/io/parquet/read/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{

use crate::{
datatypes::{Field, Schema},
error::Result,
error::{ArrowError, Result},
record_batch::RecordBatch,
};

Expand All @@ -20,6 +20,7 @@ type GroupFilter = Arc<dyn Fn(usize, &RowGroupMetaData) -> bool>;
pub struct RecordReader<R: Read + Seek> {
reader: R,
schema: Arc<Schema>,
indices: Vec<usize>,
buffer: Vec<u8>,
decompress_buffer: Vec<u8>,
groups_filter: Option<GroupFilter>,
Expand All @@ -44,23 +45,32 @@ impl<R: Read + Seek> RecordReader<R> {
let schema = get_schema(&metadata)?;

let schema_metadata = schema.metadata;
let fields: Vec<Field> = if let Some(projection) = &projection {
let (indices, fields): (Vec<usize>, Vec<Field>) = if let Some(projection) = &projection {
schema
.fields
.into_iter()
.enumerate()
.filter_map(|(index, f)| {
if projection.iter().any(|&i| i == index) {
Some(f)
Some((index, f))
} else {
None
}
})
.collect()
.unzip()
} else {
schema.fields.into_iter().collect()
schema.fields.into_iter().enumerate().unzip()
};

if let Some(projection) = &projection {
if indices.len() != projection.len() {
return Err(ArrowError::InvalidArgumentError(
"While reading parquet, some columns in the projection do not exist in the file"
.to_string(),
));
}
}

let schema = Arc::new(Schema {
fields,
metadata: schema_metadata,
Expand All @@ -69,6 +79,7 @@ impl<R: Read + Seek> RecordReader<R> {
Ok(Self {
reader,
schema,
indices,
groups_filter,
pages_filter,
metadata,
Expand Down Expand Up @@ -129,6 +140,7 @@ impl<R: Read + Seek> Iterator for RecordReader<R> {
let a = schema.fields().iter().enumerate().try_fold(
(b1, b2, Vec::with_capacity(schema.fields().len())),
|(b1, b2, mut columns), (field_index, field)| {
let field_index = self.indices[field_index]; // project into the original schema
let column_iter = get_column_iterator(
&mut self.reader,
&self.metadata,
Expand Down
3 changes: 2 additions & 1 deletion tests/it/io/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,8 @@ fn v2_decimal_26_required() -> Result<()> {
test_pyarrow_integration(8, 2, "basic", false, true)
}

fn v1_struct() -> Result<()> {
#[test]
fn v1_struct_optional() -> Result<()> {
test_pyarrow_integration(0, 1, "struct", false, false)
}

Expand Down

0 comments on commit f20d806

Please sign in to comment.