Skip to content

Commit

Permalink
Generify ColumnReaderImpl and RecordReader (apache#1040) (apache#1041)
Browse files Browse the repository at this point in the history
* Simplify record reader

* Generify ColumnReaderImpl and RecordReader (apache#1040)

* Tweak count_records predicate

* Pre-allocate bitmask

* fix: TypedBuffer::split update len

* Simplify GenericRecordReader

* Move column decoders into module

* Remove `RecordBuffer::create` method

* Remove `TypedBuffer<i16>::count_records`

* Pass null count to `ColumnValueDecoder::read`

* Pull null padding out of column reader

* Review feedback

* Format

* License headers

* Further doc tweaks

* Further docs

* Restrict ScalarBuffer types
  • Loading branch information
tustvold authored Jan 11, 2022
1 parent 79d4ab0 commit 06431ee
Show file tree
Hide file tree
Showing 8 changed files with 890 additions and 458 deletions.
95 changes: 63 additions & 32 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::cmp::{max, min};
use std::collections::{HashMap, HashSet};
use std::marker::PhantomData;
Expand Down Expand Up @@ -62,12 +63,13 @@ use crate::arrow::converter::{
IntervalYearMonthConverter, LargeBinaryArrayConverter, LargeBinaryConverter,
LargeUtf8ArrayConverter, LargeUtf8Converter,
};
use crate::arrow::record_reader::RecordReader;
use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer};
use crate::arrow::record_reader::{GenericRecordReader, RecordReader};
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::{ConvertedType, Repetition, Type as PhysicalType};
use crate::column::page::PageIterator;
use crate::column::reader::decoder::ColumnValueDecoder;
use crate::column::reader::ColumnReaderImpl;
use crate::data_type::private::ScalarDataType;
use crate::data_type::{
BoolType, ByteArrayType, DataType, DoubleType, FixedLenByteArrayType, FloatType,
Int32Type, Int64Type, Int96Type,
Expand All @@ -78,7 +80,6 @@ use crate::schema::types::{
ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr,
};
use crate::schema::visitor::TypeVisitor;
use std::any::Any;

/// Array reader reads parquet data into arrow array.
pub trait ArrayReader {
Expand Down Expand Up @@ -111,11 +112,15 @@ pub trait ArrayReader {
///
/// Returns the number of records read, which can be less than batch_size if
/// pages is exhausted.
fn read_records<T: ScalarDataType>(
record_reader: &mut RecordReader<T>,
fn read_records<V, CV>(
record_reader: &mut GenericRecordReader<V, CV>,
pages: &mut dyn PageIterator,
batch_size: usize,
) -> Result<usize> {
) -> Result<usize>
where
V: ValuesBuffer + Default,
CV: ColumnValueDecoder<Slice = V::Slice>,
{
let mut records_read = 0usize;
while records_read < batch_size {
let records_to_read = batch_size - records_read;
Expand All @@ -139,7 +144,11 @@ fn read_records<T: ScalarDataType>(

/// A NullArrayReader reads Parquet columns stored as null int32s with an Arrow
/// NullArray type.
pub struct NullArrayReader<T: ScalarDataType> {
pub struct NullArrayReader<T>
where
T: DataType,
T::T: ScalarValue,
{
data_type: ArrowType,
pages: Box<dyn PageIterator>,
def_levels_buffer: Option<Buffer>,
Expand All @@ -149,7 +158,11 @@ pub struct NullArrayReader<T: ScalarDataType> {
_type_marker: PhantomData<T>,
}

impl<T: ScalarDataType> NullArrayReader<T> {
impl<T> NullArrayReader<T>
where
T: DataType,
T::T: ScalarValue,
{
/// Construct null array reader.
pub fn new(pages: Box<dyn PageIterator>, column_desc: ColumnDescPtr) -> Result<Self> {
let record_reader = RecordReader::<T>::new(column_desc.clone());
Expand All @@ -167,7 +180,11 @@ impl<T: ScalarDataType> NullArrayReader<T> {
}

/// Implementation of primitive array reader.
impl<T: ScalarDataType> ArrayReader for NullArrayReader<T> {
impl<T> ArrayReader for NullArrayReader<T>
where
T: DataType,
T::T: ScalarValue,
{
fn as_any(&self) -> &dyn Any {
self
}
Expand Down Expand Up @@ -207,17 +224,24 @@ impl<T: ScalarDataType> ArrayReader for NullArrayReader<T> {

/// Primitive array readers are leaves of array reader tree. They accept page iterator
/// and read them into primitive arrays.
pub struct PrimitiveArrayReader<T: ScalarDataType> {
pub struct PrimitiveArrayReader<T>
where
T: DataType,
T::T: ScalarValue,
{
data_type: ArrowType,
pages: Box<dyn PageIterator>,
def_levels_buffer: Option<Buffer>,
rep_levels_buffer: Option<Buffer>,
column_desc: ColumnDescPtr,
record_reader: RecordReader<T>,
_type_marker: PhantomData<T>,
}

impl<T: ScalarDataType> PrimitiveArrayReader<T> {
impl<T> PrimitiveArrayReader<T>
where
T: DataType,
T::T: ScalarValue,
{
/// Construct primitive array reader.
pub fn new(
pages: Box<dyn PageIterator>,
Expand All @@ -241,13 +265,16 @@ impl<T: ScalarDataType> PrimitiveArrayReader<T> {
rep_levels_buffer: None,
column_desc,
record_reader,
_type_marker: PhantomData,
})
}
}

/// Implementation of primitive array reader.
impl<T: ScalarDataType> ArrayReader for PrimitiveArrayReader<T> {
impl<T> ArrayReader for PrimitiveArrayReader<T>
where
T: DataType,
T::T: ScalarValue,
{
fn as_any(&self) -> &dyn Any {
self
}
Expand Down Expand Up @@ -1923,7 +1950,26 @@ impl<'a> ArrayReaderBuilder {

#[cfg(test)]
mod tests {
use super::*;
use std::any::Any;
use std::collections::VecDeque;
use std::sync::Arc;

use rand::distributions::uniform::SampleUniform;
use rand::{thread_rng, Rng};

use arrow::array::{
Array, ArrayRef, LargeListArray, ListArray, PrimitiveArray, StringArray,
StructArray,
};
use arrow::datatypes::{
ArrowPrimitiveType, DataType as ArrowType, Date32Type as ArrowDate32, Field,
Int32Type as ArrowInt32, Int64Type as ArrowInt64,
Time32MillisecondType as ArrowTime32MillisecondArray,
Time64MicrosecondType as ArrowTime64MicrosecondArray,
TimestampMicrosecondType as ArrowTimestampMicrosecondType,
TimestampMillisecondType as ArrowTimestampMillisecondType,
};

use crate::arrow::converter::{Utf8ArrayConverter, Utf8Converter};
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::basic::{Encoding, Type as PhysicalType};
Expand All @@ -1937,23 +1983,8 @@ mod tests {
DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator,
};
use crate::util::test_common::{get_test_file, make_pages};
use arrow::array::{
Array, ArrayRef, LargeListArray, ListArray, PrimitiveArray, StringArray,
StructArray,
};
use arrow::datatypes::{
ArrowPrimitiveType, DataType as ArrowType, Date32Type as ArrowDate32, Field,
Int32Type as ArrowInt32, Int64Type as ArrowInt64,
Time32MillisecondType as ArrowTime32MillisecondArray,
Time64MicrosecondType as ArrowTime64MicrosecondArray,
TimestampMicrosecondType as ArrowTimestampMicrosecondType,
TimestampMillisecondType as ArrowTimestampMillisecondType,
};
use rand::distributions::uniform::SampleUniform;
use rand::{thread_rng, Rng};
use std::any::Any;
use std::collections::VecDeque;
use std::sync::Arc;

use super::*;

fn make_column_chunks<T: DataType>(
column_desc: ColumnDescPtr,
Expand Down
Loading

0 comments on commit 06431ee

Please sign in to comment.