Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support empty projection in ParquetRecordBatchReader #1560

Merged
merged 4 commits into from
Apr 15, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ mod builder;
mod byte_array;
mod byte_array_dictionary;
mod dictionary_buffer;
mod empty_array;
mod offset_buffer;

#[cfg(test)]
Expand Down Expand Up @@ -97,6 +98,9 @@ pub trait RowGroupCollection {
/// Get schema of parquet file.
fn schema(&self) -> Result<SchemaDescPtr>;

/// Get the numer of rows in this collection
fn num_rows(&self) -> usize;

/// Returns an iterator over the column chunks for particular column
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
}
Expand All @@ -106,6 +110,10 @@ impl RowGroupCollection for Arc<dyn FileReader> {
Ok(self.metadata().file_metadata().schema_descr_ptr())
}

fn num_rows(&self) -> usize {
self.metadata().file_metadata().num_rows() as usize
}

fn column_chunks(&self, column_index: usize) -> Result<Box<dyn PageIterator>> {
let iterator = FilePageIterator::new(column_index, Arc::clone(self))?;
Ok(Box::new(iterator))
Expand Down
15 changes: 6 additions & 9 deletions parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;

use arrow::datatypes::{DataType as ArrowType, Field, IntervalUnit, Schema, SchemaRef};

use crate::arrow::array_reader::empty_array::make_empty_array_reader;
use crate::arrow::array_reader::{
make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader,
ComplexObjectArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
Expand All @@ -37,7 +38,7 @@ use crate::data_type::{
Int96Type,
};
use crate::errors::ParquetError::ArrowError;
use crate::errors::{ParquetError, Result};
use crate::errors::{Result};
use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr};
use crate::schema::visitor::TypeVisitor;

Expand All @@ -64,10 +65,6 @@ where
filtered_root_names.insert(root.name().to_string());
}

if leaves.is_empty() {
return Err(general_err!("Can't build array reader without columns!"));
}

// Only pass root fields that take part in the projection
// to avoid traversal of columns that are not read.
// TODO: also prune unread parts of the tree in child structures
Expand Down Expand Up @@ -412,10 +409,10 @@ impl<'a> ArrayReaderBuilder {
fn build_array_reader(&mut self) -> Result<Box<dyn ArrayReader>> {
let context = ArrayReaderBuilderContext::default();

self.visit_struct(self.root_schema.clone(), &context)
.and_then(|reader_opt| {
reader_opt.ok_or_else(|| general_err!("Failed to build array reader!"))
})
match self.visit_struct(self.root_schema.clone(), &context)? {
Some(reader) => Ok(reader),
None => Ok(make_empty_array_reader(self.row_groups.num_rows())),
}
}

// Utility functions
Expand Down
58 changes: 58 additions & 0 deletions parquet/src/arrow/array_reader/empty_array.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use crate::arrow::array_reader::ArrayReader;
use crate::errors::Result;
use arrow::array::{ArrayDataBuilder, ArrayRef, StructArray};
use arrow::datatypes::DataType as ArrowType;
use std::any::Any;
use std::sync::Arc;

/// Returns an [`ArrayReader`] that yields [`StructArray`] with no columns
/// but with row counts that correspond to the amount of data in the file
///
/// This is useful for when projection eliminates all columns within a collection
pub fn make_empty_array_reader(row_count: usize) -> Box<dyn ArrayReader> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this name, but it was the best I could come up with

Box::new(EmptyArrayReader::new(row_count))
}

struct EmptyArrayReader {
data_type: ArrowType,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data_type seems not necessary to keep here, it is always ArrowType::Struct(vec![]).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's needed for get_data_type(&self) -> &ArrowType as it needs to return a reference with a lifetime coupled to the object itself.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, just found it and want to go to comment. you already replied. nvm.

remaining_rows: usize,
}

impl EmptyArrayReader {
pub fn new(row_count: usize) -> Self {
Self {
data_type: ArrowType::Struct(vec![]),
remaining_rows: row_count,
}
}
}

impl ArrayReader for EmptyArrayReader {
fn as_any(&self) -> &dyn Any {
self
}

fn get_data_type(&self) -> &ArrowType {
&self.data_type
}

fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
let len = self.remaining_rows.min(batch_size);
self.remaining_rows -= len;

let data = ArrayDataBuilder::new(self.data_type.clone())
.len(len)
.build()
.unwrap();

Ok(Arc::new(StructArray::from(data)))
}

fn get_def_levels(&self) -> Option<&[i16]> {
None
}

fn get_rep_levels(&self) -> Option<&[i16]> {
None
}
}
47 changes: 27 additions & 20 deletions parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@

use std::sync::Arc;

use arrow::array::Array;
use arrow::datatypes::{DataType as ArrowType, Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::{RecordBatch, RecordBatchReader};
use arrow::{array::StructArray, error::ArrowError};

use crate::arrow::array_reader::{build_array_reader, ArrayReader, StructArrayReader};
use crate::arrow::array_reader::{build_array_reader, ArrayReader};
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::arrow::schema::{
parquet_to_arrow_schema_by_columns, parquet_to_arrow_schema_by_root_columns,
};
use crate::errors::{ParquetError, Result};
use crate::errors::Result;
use crate::file::metadata::ParquetMetaData;
use crate::file::reader::FileReader;

Expand Down Expand Up @@ -183,20 +184,10 @@ impl Iterator for ParquetRecordBatchReader {
"Struct array reader should return struct array".to_string(),
)
});

match struct_array {
Err(err) => Some(Err(err)),
Ok(e) => {
match RecordBatch::try_new(self.schema.clone(), e.columns_ref()) {
Err(err) => Some(Err(err)),
Ok(record_batch) => {
if record_batch.num_rows() > 0 {
Some(Ok(record_batch))
} else {
None
}
}
}
}
Ok(e) => (e.len() > 0).then(|| Ok(RecordBatch::from(e))),
}
}
}
Expand All @@ -214,12 +205,6 @@ impl ParquetRecordBatchReader {
batch_size: usize,
array_reader: Box<dyn ArrayReader>,
) -> Result<Self> {
// Check that array reader is struct array reader
array_reader
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check does not seem necessary, all it cares about is that it yields StructArray

.as_any()
.downcast_ref::<StructArrayReader>()
.ok_or_else(|| general_err!("The input must be struct array reader!"))?;

let schema = match array_reader.get_data_type() {
ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
_ => unreachable!("Struct array reader's data type is not struct!"),
Expand Down Expand Up @@ -1238,4 +1223,26 @@ mod tests {
let val = list.value(0);
assert_eq!(val.len(), 0);
}

#[test]
fn test_empty_projection() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/alltypes_plain.parquet", testdata);
let file = File::open(&path).unwrap();
let reader = SerializedFileReader::try_from(file).unwrap();
let expected_rows = reader.metadata().file_metadata().num_rows() as usize;

let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
let batch_reader = arrow_reader.get_record_reader_by_columns([], 2).unwrap();

let mut total_rows = 0;
for maybe_batch in batch_reader {
let batch = maybe_batch.unwrap();
total_rows += batch.num_rows();
assert_eq!(batch.num_columns(), 0);
assert!(batch.num_rows() <= 2);
}

assert_eq!(total_rows, expected_rows);
}
}