Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Simplified reading parquet (#532)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Oct 18, 2021
1 parent 13f8d09 commit fdbaa18
Show file tree
Hide file tree
Showing 28 changed files with 145 additions and 255 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ futures = { version = "0.3", optional = true }
# for faster hashing
ahash = { version = "0.7", optional = true }

parquet2 = { version = "0.5.2", optional = true, default_features = false, features = ["stream"] }
parquet2 = { version = "0.6", optional = true, default_features = false, features = ["stream"] }

avro-rs = { version = "0.13", optional = true, default_features = false }

Expand Down
3 changes: 2 additions & 1 deletion examples/parquet_read.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::fs::File;
use std::io::BufReader;

use arrow2::io::parquet::read;
use arrow2::{array::Array, error::Result};

fn read_column_chunk(path: &str, row_group: usize, column: usize) -> Result<Box<dyn Array>> {
// Open a file, a common operation in Rust
let mut file = File::open(path)?;
let mut file = BufReader::new(File::open(path)?);

// Read the files' metadata. This has a small IO cost because it requires seeking to the end
// of the file to read its footer.
Expand Down
8 changes: 3 additions & 5 deletions examples/parquet_read_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,14 @@ fn parallel_read(path: &str) -> Result<Vec<Box<dyn Array>>> {
let metadata_consumer = file_metadata.clone();
let arrow_schema_consumer = arrow_schema.clone();
let child = thread::spawn(move || {
let (column, row_group, iter) = rx_consumer.recv().unwrap();
let (column, row_group, pages) = rx_consumer.recv().unwrap();
let start = SystemTime::now();
println!("consumer start - {} {}", column, row_group);
let metadata = metadata_consumer.row_groups[row_group].column(column);
let data_type = arrow_schema_consumer.fields()[column].data_type().clone();

let pages = iter
.into_iter()
.map(|x| x.and_then(|x| read::decompress(x, &mut vec![])));
let mut pages = read::streaming_iterator::convert(pages);
let mut pages = read::BasicDecompressor::new(pages.into_iter(), vec![]);

let array = read::page_iter_to_array(&mut pages, metadata, data_type);
println!(
"consumer end - {:?}: {} {}",
Expand Down
31 changes: 19 additions & 12 deletions examples/parquet_write.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::fs::File;
use std::iter::once;

use arrow2::error::ArrowError;
use arrow2::io::parquet::write::to_parquet_schema;
use arrow2::{
array::{Array, Int32Array},
datatypes::{Field, Schema},
error::Result,
io::parquet::write::{
array_to_page, write_file, Compression, DynIter, Encoding, Version, WriteOptions,
array_to_pages, write_file, Compression, Compressor, DynIter, DynStreamingIterator,
Encoding, FallibleStreamingIterator, Version, WriteOptions,
},
};

Expand All @@ -24,17 +26,22 @@ fn write_single_array(path: &str, array: &dyn Array, field: Field) -> Result<()>
// map arrow fields to parquet fields
let parquet_schema = to_parquet_schema(&schema)?;

// Declare the row group iterator. This must be an iterator of iterators of iterators:
// * first iterator of row groups
// * second iterator of column chunks
// * third iterator of pages
// an array can be divided in multiple pages via `.slice(offset, length)` (`O(1)`).
// All column chunks within a row group MUST have the same length.
let row_groups = once(Result::Ok(DynIter::new(once(Ok(DynIter::new(
once(array)
.zip(parquet_schema.columns().to_vec().into_iter())
.map(|(array, descriptor)| array_to_page(array, descriptor, options, encoding)),
))))));
let descriptor = parquet_schema.columns()[0].clone();

// Declare the row group iterator. This must be an iterator of iterators of streaming iterators
// * first iterator over row groups
let row_groups = once(Result::Ok(DynIter::new(
// * second iterator over column chunks (we assume no struct arrays -> `once` column)
once(
// * third iterator over (compressed) pages; dictionary encoding may lead to multiple pages per array.
array_to_pages(array, descriptor, options, encoding).map(move |pages| {
let encoded_pages = DynIter::new(pages.map(|x| Ok(x?)));
let compressed_pages = Compressor::new(encoded_pages, options.compression, vec![])
.map_err(ArrowError::from);
DynStreamingIterator::new(compressed_pages)
}),
),
)));

// Create a new empty file
let mut file = File::create(path)?;
Expand Down
6 changes: 6 additions & 0 deletions src/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,9 @@ impl From<parquet2::error::ParquetError> for ArrowError {
ArrowError::External("".to_string(), Box::new(error))
}
}

impl From<ArrowError> for parquet2::error::ParquetError {
fn from(error: ArrowError) -> Self {
parquet2::error::ParquetError::General(error.to_string())
}
}
9 changes: 4 additions & 5 deletions src/io/parquet/read/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use parquet2::{
encoding::{delta_length_byte_array, hybrid_rle, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::{BinaryPageDict, DataPage},
read::StreamingIterator,
FallibleStreamingIterator,
};

use crate::{
Expand Down Expand Up @@ -308,17 +308,16 @@ pub fn iter_to_array<O, I, E>(
where
ArrowError: From<E>,
O: Offset,
E: Clone,
I: StreamingIterator<Item = std::result::Result<DataPage, E>>,
I: FallibleStreamingIterator<Item = DataPage, Error = E>,
{
let capacity = metadata.num_values() as usize;
let mut values = MutableBuffer::<u8>::with_capacity(0);
let mut offsets = MutableBuffer::<O>::with_capacity(1 + capacity);
offsets.push(O::default());
let mut validity = MutableBitmap::with_capacity(capacity);
while let Some(page) = iter.next() {
while let Some(page) = iter.next()? {
extend_from_page(
page.as_ref().map_err(|x| x.clone())?,
page,
metadata.descriptor(),
&mut offsets,
&mut values,
Expand Down
9 changes: 4 additions & 5 deletions src/io/parquet/read/binary/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use parquet2::{
encoding::{hybrid_rle, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::{BinaryPageDict, DataPage},
read::StreamingIterator,
FallibleStreamingIterator,
};

use super::super::utils as other_utils;
Expand Down Expand Up @@ -133,17 +133,16 @@ where
ArrowError: From<E>,
O: Offset,
K: DictionaryKey,
E: Clone,
I: StreamingIterator<Item = std::result::Result<DataPage, E>>,
I: FallibleStreamingIterator<Item = DataPage, Error = E>,
{
let capacity = metadata.num_values() as usize;
let mut indices = MutableBuffer::<K>::with_capacity(capacity);
let mut values = MutableBuffer::<u8>::with_capacity(0);
let mut offsets = MutableBuffer::<O>::with_capacity(1 + capacity);
let mut validity = MutableBitmap::with_capacity(capacity);
while let Some(page) = iter.next() {
while let Some(page) = iter.next()? {
extend_from_page(
page.as_ref().map_err(|x| x.clone())?,
page,
metadata.descriptor(),
&mut indices,
&mut offsets,
Expand Down
10 changes: 5 additions & 5 deletions src/io/parquet/read/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use parquet2::{
encoding::{hybrid_rle::HybridRleDecoder, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::DataPage,
read::{levels::get_bit_width, StreamingIterator},
read::levels::get_bit_width,
FallibleStreamingIterator,
};

use super::super::nested_utils::*;
Expand Down Expand Up @@ -153,8 +154,7 @@ pub fn iter_to_array<O, I, E>(
where
O: Offset,
ArrowError: From<E>,
E: Clone,
I: StreamingIterator<Item = std::result::Result<DataPage, E>>,
I: FallibleStreamingIterator<Item = DataPage, Error = E>,
{
let capacity = metadata.num_values() as usize;
let mut values = MutableBuffer::<u8>::with_capacity(0);
Expand All @@ -164,9 +164,9 @@ where

let (mut nested, is_nullable) = init_nested(metadata.descriptor().base_type(), capacity);

while let Some(page) = iter.next() {
while let Some(page) = iter.next()? {
extend_from_page(
page.as_ref().map_err(|x| x.clone())?,
page,
metadata.descriptor(),
is_nullable,
&mut nested,
Expand Down
14 changes: 4 additions & 10 deletions src/io/parquet/read/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use parquet2::{
encoding::{hybrid_rle, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::DataPage,
read::StreamingIterator,
FallibleStreamingIterator,
};

pub(super) fn read_required(buffer: &[u8], additional: usize, values: &mut MutableBitmap) {
Expand Down Expand Up @@ -71,19 +71,13 @@ fn read_optional(
pub fn iter_to_array<I, E>(mut iter: I, metadata: &ColumnChunkMetaData) -> Result<BooleanArray>
where
ArrowError: From<E>,
E: Clone,
I: StreamingIterator<Item = std::result::Result<DataPage, E>>,
I: FallibleStreamingIterator<Item = DataPage, Error = E>,
{
let capacity = metadata.num_values() as usize;
let mut values = MutableBitmap::with_capacity(capacity);
let mut validity = MutableBitmap::with_capacity(capacity);
while let Some(page) = iter.next() {
extend_from_page(
page.as_ref().map_err(|x| x.clone())?,
metadata.descriptor(),
&mut values,
&mut validity,
)?
while let Some(page) = iter.next()? {
extend_from_page(page, metadata.descriptor(), &mut values, &mut validity)?
}

Ok(BooleanArray::from_data(
Expand Down
10 changes: 5 additions & 5 deletions src/io/parquet/read/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use parquet2::{
encoding::{hybrid_rle::HybridRleDecoder, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::DataPage,
read::{levels::get_bit_width, StreamingIterator},
read::levels::get_bit_width,
FallibleStreamingIterator,
};

use super::super::nested_utils::*;
Expand Down Expand Up @@ -137,18 +138,17 @@ pub fn iter_to_array<I, E>(
) -> Result<Box<dyn Array>>
where
ArrowError: From<E>,
E: Clone,
I: StreamingIterator<Item = std::result::Result<DataPage, E>>,
I: FallibleStreamingIterator<Item = DataPage, Error = E>,
{
let capacity = metadata.num_values() as usize;
let mut values = MutableBitmap::with_capacity(capacity);
let mut validity = MutableBitmap::with_capacity(capacity);

let (mut nested, is_nullable) = init_nested(metadata.descriptor().base_type(), capacity);

while let Some(page) = iter.next() {
while let Some(page) = iter.next()? {
extend_from_page(
page.as_ref().map_err(|x| x.clone())?,
page,
metadata.descriptor(),
is_nullable,
&mut nested,
Expand Down
9 changes: 4 additions & 5 deletions src/io/parquet/read/fixed_size_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use futures::{pin_mut, Stream, StreamExt};
use parquet2::{
encoding::{hybrid_rle, Encoding},
page::{DataPage, FixedLenByteArrayPageDict},
read::StreamingIterator,
FallibleStreamingIterator,
};

use super::{ColumnChunkMetaData, ColumnDescriptor};
Expand Down Expand Up @@ -134,17 +134,16 @@ pub fn iter_to_array<I, E>(
) -> Result<FixedSizeBinaryArray>
where
ArrowError: From<E>,
E: Clone,
I: StreamingIterator<Item = std::result::Result<DataPage, E>>,
I: FallibleStreamingIterator<Item = DataPage, Error = E>,
{
let size = *FixedSizeBinaryArray::get_size(&data_type) as usize;

let capacity = metadata.num_values() as usize;
let mut values = MutableBuffer::<u8>::with_capacity(capacity * size);
let mut validity = MutableBitmap::with_capacity(capacity);
while let Some(page) = iter.next() {
while let Some(page) = iter.next()? {
extend_from_page(
page.as_ref().map_err(|x| x.clone())?,
page,
size,
metadata.descriptor(),
&mut values,
Expand Down
10 changes: 5 additions & 5 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@ use std::{
use futures::{AsyncRead, AsyncSeek, Stream};
pub use parquet2::{
error::ParquetError,
fallible_streaming_iterator,
metadata::{ColumnChunkMetaData, ColumnDescriptor, RowGroupMetaData},
page::{CompressedDataPage, DataPage, DataPageHeader},
read::{
decompress, get_page_iterator as _get_page_iterator, get_page_stream as _get_page_stream,
read_metadata as _read_metadata, read_metadata_async as _read_metadata_async,
streaming_iterator, Decompressor, PageFilter, PageIterator, StreamingIterator,
BasicDecompressor, Decompressor, PageFilter, PageIterator,
},
schema::types::{
LogicalType, ParquetType, PhysicalType, PrimitiveConvertedType,
TimeUnit as ParquetTimeUnit, TimestampType,
},
types::int96_to_i64_ns,
FallibleStreamingIterator,
};

use crate::{
Expand Down Expand Up @@ -82,7 +84,7 @@ pub async fn read_metadata_async<R: AsyncRead + AsyncSeek + Send + Unpin>(

fn dict_read<
K: DictionaryKey,
I: StreamingIterator<Item = std::result::Result<DataPage, ParquetError>>,
I: FallibleStreamingIterator<Item = DataPage, Error = ParquetError>,
>(
iter: &mut I,
metadata: &ColumnChunkMetaData,
Expand Down Expand Up @@ -164,9 +166,7 @@ fn dict_read<
}

/// Converts an iterator of [`DataPage`] into a single [`Array`].
pub fn page_iter_to_array<
I: StreamingIterator<Item = std::result::Result<DataPage, ParquetError>>,
>(
pub fn page_iter_to_array<I: FallibleStreamingIterator<Item = DataPage, Error = ParquetError>>(
iter: &mut I,
metadata: &ColumnChunkMetaData,
data_type: DataType,
Expand Down
9 changes: 4 additions & 5 deletions src/io/parquet/read/primitive/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::sync::Arc;
use parquet2::{
encoding::{hybrid_rle, Encoding},
page::{DataPage, PrimitivePageDict},
read::StreamingIterator,
types::NativeType,
FallibleStreamingIterator,
};

use super::super::utils;
Expand Down Expand Up @@ -135,18 +135,17 @@ where
ArrowError: From<E>,
T: NativeType,
K: DictionaryKey,
E: Clone,
A: ArrowNativeType,
F: Copy + Fn(T) -> A,
I: StreamingIterator<Item = std::result::Result<DataPage, E>>,
I: FallibleStreamingIterator<Item = DataPage, Error = E>,
{
let capacity = metadata.num_values() as usize;
let mut indices = MutableBuffer::<K>::with_capacity(capacity);
let mut values = MutableBuffer::<A>::with_capacity(capacity);
let mut validity = MutableBitmap::with_capacity(capacity);
while let Some(page) = iter.next() {
while let Some(page) = iter.next()? {
extend_from_page(
page.as_ref().map_err(|x| x.clone())?,
page,
metadata.descriptor(),
&mut indices,
&mut values,
Expand Down
Loading

0 comments on commit fdbaa18

Please sign in to comment.