Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generify ColumnReaderImpl and RecordReader (#1040) #1041

Merged
merged 20 commits into from
Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 63 additions & 32 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::cmp::{max, min};
use std::collections::{HashMap, HashSet};
use std::marker::PhantomData;
Expand Down Expand Up @@ -62,12 +63,13 @@ use crate::arrow::converter::{
IntervalYearMonthConverter, LargeBinaryArrayConverter, LargeBinaryConverter,
LargeUtf8ArrayConverter, LargeUtf8Converter,
};
use crate::arrow::record_reader::RecordReader;
use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer};
use crate::arrow::record_reader::{GenericRecordReader, RecordReader};
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::{ConvertedType, Repetition, Type as PhysicalType};
use crate::column::page::PageIterator;
use crate::column::reader::decoder::ColumnValueDecoder;
use crate::column::reader::ColumnReaderImpl;
use crate::data_type::private::ScalarDataType;
use crate::data_type::{
BoolType, ByteArrayType, DataType, DoubleType, FixedLenByteArrayType, FloatType,
Int32Type, Int64Type, Int96Type,
Expand All @@ -78,7 +80,6 @@ use crate::schema::types::{
ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr,
};
use crate::schema::visitor::TypeVisitor;
use std::any::Any;

/// Array reader reads parquet data into arrow array.
pub trait ArrayReader {
Expand Down Expand Up @@ -111,11 +112,15 @@ pub trait ArrayReader {
///
/// Returns the number of records read, which can be less than batch_size if
/// pages is exhausted.
fn read_records<T: ScalarDataType>(
record_reader: &mut RecordReader<T>,
fn read_records<V, CV>(
record_reader: &mut GenericRecordReader<V, CV>,
pages: &mut dyn PageIterator,
batch_size: usize,
) -> Result<usize> {
) -> Result<usize>
where
V: ValuesBuffer + Default,
CV: ColumnValueDecoder<Slice = V::Slice>,
{
let mut records_read = 0usize;
while records_read < batch_size {
let records_to_read = batch_size - records_read;
Expand All @@ -139,7 +144,11 @@ fn read_records<T: ScalarDataType>(

/// A NullArrayReader reads Parquet columns stored as null int32s with an Arrow
/// NullArray type.
pub struct NullArrayReader<T: ScalarDataType> {
pub struct NullArrayReader<T>
where
T: DataType,
T::T: ScalarValue,
{
data_type: ArrowType,
pages: Box<dyn PageIterator>,
def_levels_buffer: Option<Buffer>,
Expand All @@ -149,7 +158,11 @@ pub struct NullArrayReader<T: ScalarDataType> {
_type_marker: PhantomData<T>,
}

impl<T: ScalarDataType> NullArrayReader<T> {
impl<T> NullArrayReader<T>
where
T: DataType,
T::T: ScalarValue,
{
/// Construct null array reader.
pub fn new(pages: Box<dyn PageIterator>, column_desc: ColumnDescPtr) -> Result<Self> {
let record_reader = RecordReader::<T>::new(column_desc.clone());
Expand All @@ -167,7 +180,11 @@ impl<T: ScalarDataType> NullArrayReader<T> {
}

/// Implementation of primitive array reader.
impl<T: ScalarDataType> ArrayReader for NullArrayReader<T> {
impl<T> ArrayReader for NullArrayReader<T>
where
T: DataType,
T::T: ScalarValue,
{
fn as_any(&self) -> &dyn Any {
self
}
Expand Down Expand Up @@ -207,17 +224,24 @@ impl<T: ScalarDataType> ArrayReader for NullArrayReader<T> {

/// Primitive array readers are leaves of array reader tree. They accept page iterator
/// and read them into primitive arrays.
pub struct PrimitiveArrayReader<T: ScalarDataType> {
pub struct PrimitiveArrayReader<T>
where
T: DataType,
T::T: ScalarValue,
{
data_type: ArrowType,
pages: Box<dyn PageIterator>,
def_levels_buffer: Option<Buffer>,
rep_levels_buffer: Option<Buffer>,
column_desc: ColumnDescPtr,
record_reader: RecordReader<T>,
_type_marker: PhantomData<T>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seemed to be an orphan so I just removed it

}

impl<T: ScalarDataType> PrimitiveArrayReader<T> {
impl<T> PrimitiveArrayReader<T>
where
T: DataType,
T::T: ScalarValue,
{
/// Construct primitive array reader.
pub fn new(
pages: Box<dyn PageIterator>,
Expand All @@ -241,13 +265,16 @@ impl<T: ScalarDataType> PrimitiveArrayReader<T> {
rep_levels_buffer: None,
column_desc,
record_reader,
_type_marker: PhantomData,
})
}
}

/// Implementation of primitive array reader.
impl<T: ScalarDataType> ArrayReader for PrimitiveArrayReader<T> {
impl<T> ArrayReader for PrimitiveArrayReader<T>
where
T: DataType,
T::T: ScalarValue,
{
fn as_any(&self) -> &dyn Any {
self
}
Expand Down Expand Up @@ -1923,7 +1950,26 @@ impl<'a> ArrayReaderBuilder {

#[cfg(test)]
mod tests {
use super::*;
use std::any::Any;
use std::collections::VecDeque;
use std::sync::Arc;

use rand::distributions::uniform::SampleUniform;
use rand::{thread_rng, Rng};

use arrow::array::{
Array, ArrayRef, LargeListArray, ListArray, PrimitiveArray, StringArray,
StructArray,
};
use arrow::datatypes::{
ArrowPrimitiveType, DataType as ArrowType, Date32Type as ArrowDate32, Field,
Int32Type as ArrowInt32, Int64Type as ArrowInt64,
Time32MillisecondType as ArrowTime32MillisecondArray,
Time64MicrosecondType as ArrowTime64MicrosecondArray,
TimestampMicrosecondType as ArrowTimestampMicrosecondType,
TimestampMillisecondType as ArrowTimestampMillisecondType,
};

use crate::arrow::converter::{Utf8ArrayConverter, Utf8Converter};
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::basic::{Encoding, Type as PhysicalType};
Expand All @@ -1937,23 +1983,8 @@ mod tests {
DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator,
};
use crate::util::test_common::{get_test_file, make_pages};
use arrow::array::{
Array, ArrayRef, LargeListArray, ListArray, PrimitiveArray, StringArray,
StructArray,
};
use arrow::datatypes::{
ArrowPrimitiveType, DataType as ArrowType, Date32Type as ArrowDate32, Field,
Int32Type as ArrowInt32, Int64Type as ArrowInt64,
Time32MillisecondType as ArrowTime32MillisecondArray,
Time64MicrosecondType as ArrowTime64MicrosecondArray,
TimestampMicrosecondType as ArrowTimestampMicrosecondType,
TimestampMillisecondType as ArrowTimestampMillisecondType,
};
use rand::distributions::uniform::SampleUniform;
use rand::{thread_rng, Rng};
use std::any::Any;
use std::collections::VecDeque;
use std::sync::Arc;

use super::*;

fn make_column_chunks<T: DataType>(
column_desc: ColumnDescPtr,
Expand Down
Loading