diff --git a/Cargo.toml b/Cargo.toml index 20a11dc9061..73642d0f39c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,7 +61,7 @@ futures = { version = "0.3", optional = true } # for faster hashing ahash = { version = "0.7", optional = true } -parquet2 = { version = "0.5.2", optional = true, default_features = false, features = ["stream"] } +parquet2 = { version = "0.6", optional = true, default_features = false, features = ["stream"] } avro-rs = { version = "0.13", optional = true, default_features = false } diff --git a/examples/parquet_read.rs b/examples/parquet_read.rs index 1e0183ba1e3..8ab3ccdac2c 100644 --- a/examples/parquet_read.rs +++ b/examples/parquet_read.rs @@ -1,11 +1,12 @@ use std::fs::File; +use std::io::BufReader; use arrow2::io::parquet::read; use arrow2::{array::Array, error::Result}; fn read_column_chunk(path: &str, row_group: usize, column: usize) -> Result> { // Open a file, a common operation in Rust - let mut file = File::open(path)?; + let mut file = BufReader::new(File::open(path)?); // Read the files' metadata. This has a small IO cost because it requires seeking to the end // of the file to read its footer. diff --git a/examples/parquet_read_parallel.rs b/examples/parquet_read_parallel.rs index defaf6bb411..9dead6c43ac 100644 --- a/examples/parquet_read_parallel.rs +++ b/examples/parquet_read_parallel.rs @@ -47,16 +47,14 @@ fn parallel_read(path: &str) -> Result>> { let metadata_consumer = file_metadata.clone(); let arrow_schema_consumer = arrow_schema.clone(); let child = thread::spawn(move || { - let (column, row_group, iter) = rx_consumer.recv().unwrap(); + let (column, row_group, pages) = rx_consumer.recv().unwrap(); let start = SystemTime::now(); println!("consumer start - {} {}", column, row_group); let metadata = metadata_consumer.row_groups[row_group].column(column); let data_type = arrow_schema_consumer.fields()[column].data_type().clone(); - let pages = iter - .into_iter() - .map(|x| x.and_then(|x| read::decompress(x, &mut vec![]))); - let mut pages = read::streaming_iterator::convert(pages); + let mut pages = read::BasicDecompressor::new(pages.into_iter(), vec![]); + let array = read::page_iter_to_array(&mut pages, metadata, data_type); println!( "consumer end - {:?}: {} {}", diff --git a/examples/parquet_write.rs b/examples/parquet_write.rs index d6df8d736c2..888fce79242 100644 --- a/examples/parquet_write.rs +++ b/examples/parquet_write.rs @@ -1,13 +1,15 @@ use std::fs::File; use std::iter::once; +use arrow2::error::ArrowError; use arrow2::io::parquet::write::to_parquet_schema; use arrow2::{ array::{Array, Int32Array}, datatypes::{Field, Schema}, error::Result, io::parquet::write::{ - array_to_page, write_file, Compression, DynIter, Encoding, Version, WriteOptions, + array_to_pages, write_file, Compression, Compressor, DynIter, DynStreamingIterator, + Encoding, FallibleStreamingIterator, Version, WriteOptions, }, }; @@ -24,17 +26,22 @@ fn write_single_array(path: &str, array: &dyn Array, field: Field) -> Result<()> // map arrow fields to parquet fields let parquet_schema = to_parquet_schema(&schema)?; - // Declare the row group iterator. This must be an iterator of iterators of iterators: - // * first iterator of row groups - // * second iterator of column chunks - // * third iterator of pages - // an array can be divided in multiple pages via `.slice(offset, length)` (`O(1)`). - // All column chunks within a row group MUST have the same length. - let row_groups = once(Result::Ok(DynIter::new(once(Ok(DynIter::new( - once(array) - .zip(parquet_schema.columns().to_vec().into_iter()) - .map(|(array, descriptor)| array_to_page(array, descriptor, options, encoding)), - )))))); + let descriptor = parquet_schema.columns()[0].clone(); + + // Declare the row group iterator. This must be an iterator of iterators of streaming iterators + // * first iterator over row groups + let row_groups = once(Result::Ok(DynIter::new( + // * second iterator over column chunks (we assume no struct arrays -> `once` column) + once( + // * third iterator over (compressed) pages; dictionary encoding may lead to multiple pages per array. + array_to_pages(array, descriptor, options, encoding).map(move |pages| { + let encoded_pages = DynIter::new(pages.map(|x| Ok(x?))); + let compressed_pages = Compressor::new(encoded_pages, options.compression, vec![]) + .map_err(ArrowError::from); + DynStreamingIterator::new(compressed_pages) + }), + ), + ))); // Create a new empty file let mut file = File::create(path)?; diff --git a/src/io/parquet/mod.rs b/src/io/parquet/mod.rs index 6c01e6e6bc4..ad9f28f3ae9 100644 --- a/src/io/parquet/mod.rs +++ b/src/io/parquet/mod.rs @@ -11,3 +11,9 @@ impl From for ArrowError { ArrowError::External("".to_string(), Box::new(error)) } } + +impl From for parquet2::error::ParquetError { + fn from(error: ArrowError) -> Self { + parquet2::error::ParquetError::General(error.to_string()) + } +} diff --git a/src/io/parquet/read/binary/basic.rs b/src/io/parquet/read/binary/basic.rs index 44d52857e99..655e77144eb 100644 --- a/src/io/parquet/read/binary/basic.rs +++ b/src/io/parquet/read/binary/basic.rs @@ -3,7 +3,7 @@ use parquet2::{ encoding::{delta_length_byte_array, hybrid_rle, Encoding}, metadata::{ColumnChunkMetaData, ColumnDescriptor}, page::{BinaryPageDict, DataPage}, - read::StreamingIterator, + FallibleStreamingIterator, }; use crate::{ @@ -308,17 +308,16 @@ pub fn iter_to_array( where ArrowError: From, O: Offset, - E: Clone, - I: StreamingIterator>, + I: FallibleStreamingIterator, { let capacity = metadata.num_values() as usize; let mut values = MutableBuffer::::with_capacity(0); let mut offsets = MutableBuffer::::with_capacity(1 + capacity); offsets.push(O::default()); let mut validity = MutableBitmap::with_capacity(capacity); - while let Some(page) = iter.next() { + while let Some(page) = iter.next()? { extend_from_page( - page.as_ref().map_err(|x| x.clone())?, + page, metadata.descriptor(), &mut offsets, &mut values, diff --git a/src/io/parquet/read/binary/dictionary.rs b/src/io/parquet/read/binary/dictionary.rs index b32616069fc..6e650974f42 100644 --- a/src/io/parquet/read/binary/dictionary.rs +++ b/src/io/parquet/read/binary/dictionary.rs @@ -4,7 +4,7 @@ use parquet2::{ encoding::{hybrid_rle, Encoding}, metadata::{ColumnChunkMetaData, ColumnDescriptor}, page::{BinaryPageDict, DataPage}, - read::StreamingIterator, + FallibleStreamingIterator, }; use super::super::utils as other_utils; @@ -133,17 +133,16 @@ where ArrowError: From, O: Offset, K: DictionaryKey, - E: Clone, - I: StreamingIterator>, + I: FallibleStreamingIterator, { let capacity = metadata.num_values() as usize; let mut indices = MutableBuffer::::with_capacity(capacity); let mut values = MutableBuffer::::with_capacity(0); let mut offsets = MutableBuffer::::with_capacity(1 + capacity); let mut validity = MutableBitmap::with_capacity(capacity); - while let Some(page) = iter.next() { + while let Some(page) = iter.next()? { extend_from_page( - page.as_ref().map_err(|x| x.clone())?, + page, metadata.descriptor(), &mut indices, &mut offsets, diff --git a/src/io/parquet/read/binary/nested.rs b/src/io/parquet/read/binary/nested.rs index 211a79ef958..0482074c480 100644 --- a/src/io/parquet/read/binary/nested.rs +++ b/src/io/parquet/read/binary/nested.rs @@ -4,7 +4,8 @@ use parquet2::{ encoding::{hybrid_rle::HybridRleDecoder, Encoding}, metadata::{ColumnChunkMetaData, ColumnDescriptor}, page::DataPage, - read::{levels::get_bit_width, StreamingIterator}, + read::levels::get_bit_width, + FallibleStreamingIterator, }; use super::super::nested_utils::*; @@ -153,8 +154,7 @@ pub fn iter_to_array( where O: Offset, ArrowError: From, - E: Clone, - I: StreamingIterator>, + I: FallibleStreamingIterator, { let capacity = metadata.num_values() as usize; let mut values = MutableBuffer::::with_capacity(0); @@ -164,9 +164,9 @@ where let (mut nested, is_nullable) = init_nested(metadata.descriptor().base_type(), capacity); - while let Some(page) = iter.next() { + while let Some(page) = iter.next()? { extend_from_page( - page.as_ref().map_err(|x| x.clone())?, + page, metadata.descriptor(), is_nullable, &mut nested, diff --git a/src/io/parquet/read/boolean/basic.rs b/src/io/parquet/read/boolean/basic.rs index 914b2921b9a..128caaac8e9 100644 --- a/src/io/parquet/read/boolean/basic.rs +++ b/src/io/parquet/read/boolean/basic.rs @@ -12,7 +12,7 @@ use parquet2::{ encoding::{hybrid_rle, Encoding}, metadata::{ColumnChunkMetaData, ColumnDescriptor}, page::DataPage, - read::StreamingIterator, + FallibleStreamingIterator, }; pub(super) fn read_required(buffer: &[u8], additional: usize, values: &mut MutableBitmap) { @@ -71,19 +71,13 @@ fn read_optional( pub fn iter_to_array(mut iter: I, metadata: &ColumnChunkMetaData) -> Result where ArrowError: From, - E: Clone, - I: StreamingIterator>, + I: FallibleStreamingIterator, { let capacity = metadata.num_values() as usize; let mut values = MutableBitmap::with_capacity(capacity); let mut validity = MutableBitmap::with_capacity(capacity); - while let Some(page) = iter.next() { - extend_from_page( - page.as_ref().map_err(|x| x.clone())?, - metadata.descriptor(), - &mut values, - &mut validity, - )? + while let Some(page) = iter.next()? { + extend_from_page(page, metadata.descriptor(), &mut values, &mut validity)? } Ok(BooleanArray::from_data( diff --git a/src/io/parquet/read/boolean/nested.rs b/src/io/parquet/read/boolean/nested.rs index fc48477ea88..5d99bbb2f7d 100644 --- a/src/io/parquet/read/boolean/nested.rs +++ b/src/io/parquet/read/boolean/nested.rs @@ -4,7 +4,8 @@ use parquet2::{ encoding::{hybrid_rle::HybridRleDecoder, Encoding}, metadata::{ColumnChunkMetaData, ColumnDescriptor}, page::DataPage, - read::{levels::get_bit_width, StreamingIterator}, + read::levels::get_bit_width, + FallibleStreamingIterator, }; use super::super::nested_utils::*; @@ -137,8 +138,7 @@ pub fn iter_to_array( ) -> Result> where ArrowError: From, - E: Clone, - I: StreamingIterator>, + I: FallibleStreamingIterator, { let capacity = metadata.num_values() as usize; let mut values = MutableBitmap::with_capacity(capacity); @@ -146,9 +146,9 @@ where let (mut nested, is_nullable) = init_nested(metadata.descriptor().base_type(), capacity); - while let Some(page) = iter.next() { + while let Some(page) = iter.next()? { extend_from_page( - page.as_ref().map_err(|x| x.clone())?, + page, metadata.descriptor(), is_nullable, &mut nested, diff --git a/src/io/parquet/read/fixed_size_binary.rs b/src/io/parquet/read/fixed_size_binary.rs index 2a02524785b..9552b2b1082 100644 --- a/src/io/parquet/read/fixed_size_binary.rs +++ b/src/io/parquet/read/fixed_size_binary.rs @@ -2,7 +2,7 @@ use futures::{pin_mut, Stream, StreamExt}; use parquet2::{ encoding::{hybrid_rle, Encoding}, page::{DataPage, FixedLenByteArrayPageDict}, - read::StreamingIterator, + FallibleStreamingIterator, }; use super::{ColumnChunkMetaData, ColumnDescriptor}; @@ -134,17 +134,16 @@ pub fn iter_to_array( ) -> Result where ArrowError: From, - E: Clone, - I: StreamingIterator>, + I: FallibleStreamingIterator, { let size = *FixedSizeBinaryArray::get_size(&data_type) as usize; let capacity = metadata.num_values() as usize; let mut values = MutableBuffer::::with_capacity(capacity * size); let mut validity = MutableBitmap::with_capacity(capacity); - while let Some(page) = iter.next() { + while let Some(page) = iter.next()? { extend_from_page( - page.as_ref().map_err(|x| x.clone())?, + page, size, metadata.descriptor(), &mut values, diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index bb05a744633..1f6198c1e69 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -8,18 +8,20 @@ use std::{ use futures::{AsyncRead, AsyncSeek, Stream}; pub use parquet2::{ error::ParquetError, + fallible_streaming_iterator, metadata::{ColumnChunkMetaData, ColumnDescriptor, RowGroupMetaData}, page::{CompressedDataPage, DataPage, DataPageHeader}, read::{ decompress, get_page_iterator as _get_page_iterator, get_page_stream as _get_page_stream, read_metadata as _read_metadata, read_metadata_async as _read_metadata_async, - streaming_iterator, Decompressor, PageFilter, PageIterator, StreamingIterator, + BasicDecompressor, Decompressor, PageFilter, PageIterator, }, schema::types::{ LogicalType, ParquetType, PhysicalType, PrimitiveConvertedType, TimeUnit as ParquetTimeUnit, TimestampType, }, types::int96_to_i64_ns, + FallibleStreamingIterator, }; use crate::{ @@ -82,7 +84,7 @@ pub async fn read_metadata_async( fn dict_read< K: DictionaryKey, - I: StreamingIterator>, + I: FallibleStreamingIterator, >( iter: &mut I, metadata: &ColumnChunkMetaData, @@ -164,9 +166,7 @@ fn dict_read< } /// Converts an iterator of [`DataPage`] into a single [`Array`]. -pub fn page_iter_to_array< - I: StreamingIterator>, ->( +pub fn page_iter_to_array>( iter: &mut I, metadata: &ColumnChunkMetaData, data_type: DataType, diff --git a/src/io/parquet/read/primitive/dictionary.rs b/src/io/parquet/read/primitive/dictionary.rs index 962ca34d6ef..6a8caeb17d2 100644 --- a/src/io/parquet/read/primitive/dictionary.rs +++ b/src/io/parquet/read/primitive/dictionary.rs @@ -3,8 +3,8 @@ use std::sync::Arc; use parquet2::{ encoding::{hybrid_rle, Encoding}, page::{DataPage, PrimitivePageDict}, - read::StreamingIterator, types::NativeType, + FallibleStreamingIterator, }; use super::super::utils; @@ -135,18 +135,17 @@ where ArrowError: From, T: NativeType, K: DictionaryKey, - E: Clone, A: ArrowNativeType, F: Copy + Fn(T) -> A, - I: StreamingIterator>, + I: FallibleStreamingIterator, { let capacity = metadata.num_values() as usize; let mut indices = MutableBuffer::::with_capacity(capacity); let mut values = MutableBuffer::::with_capacity(capacity); let mut validity = MutableBitmap::with_capacity(capacity); - while let Some(page) = iter.next() { + while let Some(page) = iter.next()? { extend_from_page( - page.as_ref().map_err(|x| x.clone())?, + page, metadata.descriptor(), &mut indices, &mut values, diff --git a/src/io/parquet/read/primitive/mod.rs b/src/io/parquet/read/primitive/mod.rs index 3ea354e656f..748c1285224 100644 --- a/src/io/parquet/read/primitive/mod.rs +++ b/src/io/parquet/read/primitive/mod.rs @@ -6,7 +6,7 @@ mod utils; use std::sync::Arc; use futures::{pin_mut, Stream, StreamExt}; -use parquet2::{page::DataPage, read::StreamingIterator, types::NativeType}; +use parquet2::{page::DataPage, types::NativeType, FallibleStreamingIterator}; use super::nested_utils::*; use super::{ColumnChunkMetaData, ColumnDescriptor}; @@ -30,22 +30,15 @@ pub fn iter_to_array( where ArrowError: From, T: NativeType, - E: Clone, A: ArrowNativeType, F: Copy + Fn(T) -> A, - I: StreamingIterator>, + I: FallibleStreamingIterator, { let capacity = metadata.num_values() as usize; let mut values = MutableBuffer::::with_capacity(capacity); let mut validity = MutableBitmap::with_capacity(capacity); - while let Some(page) = iter.next() { - basic::extend_from_page( - page.as_ref().map_err(|x| x.clone())?, - metadata.descriptor(), - &mut values, - &mut validity, - op, - )? + while let Some(page) = iter.next()? { + basic::extend_from_page(page, metadata.descriptor(), &mut values, &mut validity, op)? } let data_type = match data_type { @@ -114,7 +107,7 @@ where E: Clone, A: ArrowNativeType, F: Copy + Fn(T) -> A, - I: StreamingIterator>, + I: FallibleStreamingIterator, { let capacity = metadata.num_values() as usize; let mut values = MutableBuffer::::with_capacity(capacity); @@ -122,9 +115,9 @@ where let (mut nested, is_nullable) = init_nested(metadata.descriptor().base_type(), capacity); - while let Some(page) = iter.next() { + while let Some(page) = iter.next()? { nested::extend_from_page( - page.as_ref().map_err(|x| x.clone())?, + page, metadata.descriptor(), is_nullable, &mut nested, diff --git a/src/io/parquet/write/binary/basic.rs b/src/io/parquet/write/binary/basic.rs index b8803700bb5..7a672dd6fe6 100644 --- a/src/io/parquet/write/binary/basic.rs +++ b/src/io/parquet/write/binary/basic.rs @@ -1,7 +1,7 @@ use parquet2::{ encoding::{delta_bitpacked, Encoding}, metadata::ColumnDescriptor, - page::CompressedDataPage, + page::DataPage, statistics::{serialize_statistics, BinaryStatistics, ParquetStatistics, Statistics}, write::WriteOptions, }; @@ -44,7 +44,7 @@ pub fn array_to_page( options: WriteOptions, descriptor: ColumnDescriptor, encoding: Encoding, -) -> Result { +) -> Result { let validity = array.validity(); let is_optional = is_type_nullable(descriptor.type_()); @@ -77,10 +77,6 @@ pub fn array_to_page( } } - let uncompressed_page_size = buffer.len(); - - let buffer = utils::compress(buffer, options, definition_levels_byte_length)?; - let statistics = if options.write_statistics { Some(build_statistics(array, descriptor.clone())) } else { @@ -91,7 +87,6 @@ pub fn array_to_page( buffer, array.len(), array.null_count(), - uncompressed_page_size, 0, definition_levels_byte_length, statistics, diff --git a/src/io/parquet/write/binary/nested.rs b/src/io/parquet/write/binary/nested.rs index cf92180a3c9..9161741c4cb 100644 --- a/src/io/parquet/write/binary/nested.rs +++ b/src/io/parquet/write/binary/nested.rs @@ -1,5 +1,5 @@ use parquet2::{ - encoding::Encoding, metadata::ColumnDescriptor, page::CompressedDataPage, write::WriteOptions, + encoding::Encoding, metadata::ColumnDescriptor, page::DataPage, write::WriteOptions, }; use super::super::{levels, utils}; @@ -15,7 +15,7 @@ pub fn array_to_page( options: WriteOptions, descriptor: ColumnDescriptor, nested: levels::NestedInfo, -) -> Result +) -> Result where OO: Offset, O: Offset, @@ -33,14 +33,6 @@ where encode_plain(array, is_optional, &mut buffer); - let uncompressed_page_size = buffer.len(); - - let buffer = utils::compress( - buffer, - options, - definition_levels_byte_length + repetition_levels_byte_length, - )?; - let statistics = if options.write_statistics { Some(build_statistics(array, descriptor.clone())) } else { @@ -51,7 +43,6 @@ where buffer, levels::num_values(nested.offsets()), array.null_count(), - uncompressed_page_size, repetition_levels_byte_length, definition_levels_byte_length, statistics, diff --git a/src/io/parquet/write/boolean/basic.rs b/src/io/parquet/write/boolean/basic.rs index 604aa49d3cf..f9046d6d585 100644 --- a/src/io/parquet/write/boolean/basic.rs +++ b/src/io/parquet/write/boolean/basic.rs @@ -1,7 +1,7 @@ use parquet2::{ encoding::{hybrid_rle::bitpacked_encode, Encoding}, metadata::ColumnDescriptor, - page::CompressedDataPage, + page::DataPage, statistics::{serialize_statistics, BooleanStatistics, ParquetStatistics, Statistics}, write::WriteOptions, }; @@ -42,7 +42,7 @@ pub fn array_to_page( array: &BooleanArray, options: WriteOptions, descriptor: ColumnDescriptor, -) -> Result { +) -> Result { let is_optional = is_type_nullable(descriptor.type_()); let validity = array.validity(); @@ -60,10 +60,6 @@ pub fn array_to_page( encode_plain(array, is_optional, &mut buffer)?; - let uncompressed_page_size = buffer.len(); - - let buffer = utils::compress(buffer, options, definition_levels_byte_length)?; - let statistics = if options.write_statistics { Some(build_statistics(array)) } else { @@ -74,7 +70,6 @@ pub fn array_to_page( buffer, array.len(), array.null_count(), - uncompressed_page_size, 0, definition_levels_byte_length, statistics, diff --git a/src/io/parquet/write/boolean/nested.rs b/src/io/parquet/write/boolean/nested.rs index b9726f93b65..427c7a05925 100644 --- a/src/io/parquet/write/boolean/nested.rs +++ b/src/io/parquet/write/boolean/nested.rs @@ -1,5 +1,5 @@ use parquet2::{ - encoding::Encoding, metadata::ColumnDescriptor, page::CompressedDataPage, write::WriteOptions, + encoding::Encoding, metadata::ColumnDescriptor, page::DataPage, write::WriteOptions, }; use super::super::{levels, utils}; @@ -15,7 +15,7 @@ pub fn array_to_page( options: WriteOptions, descriptor: ColumnDescriptor, nested: levels::NestedInfo, -) -> Result +) -> Result where O: Offset, { @@ -32,14 +32,6 @@ where encode_plain(array, is_optional, &mut buffer)?; - let uncompressed_page_size = buffer.len(); - - let buffer = utils::compress( - buffer, - options, - definition_levels_byte_length + repetition_levels_byte_length, - )?; - let statistics = if options.write_statistics { Some(build_statistics(array)) } else { @@ -50,7 +42,6 @@ where buffer, levels::num_values(nested.offsets()), array.null_count(), - uncompressed_page_size, repetition_levels_byte_length, definition_levels_byte_length, statistics, diff --git a/src/io/parquet/write/dictionary.rs b/src/io/parquet/write/dictionary.rs index 8a221099f5f..d5df74e5bd6 100644 --- a/src/io/parquet/write/dictionary.rs +++ b/src/io/parquet/write/dictionary.rs @@ -1,7 +1,7 @@ use parquet2::{ encoding::{hybrid_rle::encode_u32, Encoding}, metadata::ColumnDescriptor, - page::{CompressedDictPage, CompressedPage}, + page::{EncodedDictPage, EncodedPage}, write::{DynIter, WriteOptions}, }; @@ -21,7 +21,7 @@ fn encode_keys( validity: Option<&Bitmap>, descriptor: ColumnDescriptor, options: WriteOptions, -) -> Result { +) -> Result { let is_optional = is_type_nullable(descriptor.type_()); let mut buffer = vec![]; @@ -94,15 +94,10 @@ fn encode_keys( encode_u32(&mut buffer, keys, num_bits)?; } - let uncompressed_page_size = buffer.len(); - - let buffer = utils::compress(buffer, options, definition_levels_byte_length)?; - utils::build_plain_page( buffer, array.len(), array.null_count(), - uncompressed_page_size, 0, definition_levels_byte_length, None, @@ -110,7 +105,7 @@ fn encode_keys( options, Encoding::RleDictionary, ) - .map(CompressedPage::Data) + .map(EncodedPage::Data) } macro_rules! dyn_prim { @@ -119,9 +114,7 @@ macro_rules! dyn_prim { let mut buffer = vec![]; primitive_encode_plain::<$from, $to>(values, false, &mut buffer); - let buffer = utils::compress(buffer, $options, 0)?; - - CompressedPage::Dict(CompressedDictPage::new(buffer, values.len())) + EncodedDictPage::new(buffer, values.len()) }}; } @@ -130,7 +123,7 @@ pub fn array_to_pages( descriptor: ColumnDescriptor, options: WriteOptions, encoding: Encoding, -) -> Result>> +) -> Result>> where PrimitiveArray: std::fmt::Display, { @@ -157,32 +150,28 @@ where let mut buffer = vec![]; utf8_encode_plain::(values, false, &mut buffer); - let buffer = utils::compress(buffer, options, 0)?; - CompressedPage::Dict(CompressedDictPage::new(buffer, values.len())) + EncodedDictPage::new(buffer, values.len()) } DataType::LargeUtf8 => { let values = array.values().as_any().downcast_ref().unwrap(); let mut buffer = vec![]; utf8_encode_plain::(values, false, &mut buffer); - let buffer = utils::compress(buffer, options, 0)?; - CompressedPage::Dict(CompressedDictPage::new(buffer, values.len())) + EncodedDictPage::new(buffer, values.len()) } DataType::Binary => { let values = array.values().as_any().downcast_ref().unwrap(); let mut buffer = vec![]; binary_encode_plain::(values, false, &mut buffer); - let buffer = utils::compress(buffer, options, 0)?; - CompressedPage::Dict(CompressedDictPage::new(buffer, values.len())) + EncodedDictPage::new(buffer, values.len()) } DataType::LargeBinary => { let values = array.values().as_any().downcast_ref().unwrap(); let mut buffer = vec![]; binary_encode_plain::(values, false, &mut buffer); - let buffer = utils::compress(buffer, options, 0)?; - CompressedPage::Dict(CompressedDictPage::new(buffer, values.len())) + EncodedDictPage::new(buffer, values.len()) } other => { return Err(ArrowError::NotYetImplemented(format!( @@ -191,6 +180,7 @@ where ))) } }; + let dict_page = EncodedPage::Dict(dict_page); // write DataPage pointing to DictPage let data_page = diff --git a/src/io/parquet/write/fixed_len_bytes.rs b/src/io/parquet/write/fixed_len_bytes.rs index 0f64b2f5214..304a79d52f8 100644 --- a/src/io/parquet/write/fixed_len_bytes.rs +++ b/src/io/parquet/write/fixed_len_bytes.rs @@ -1,8 +1,7 @@ use parquet2::{ - compression::create_codec, encoding::Encoding, metadata::ColumnDescriptor, - page::CompressedDataPage, + page::DataPage, statistics::{deserialize_statistics, serialize_statistics, ParquetStatistics}, write::WriteOptions, }; @@ -18,7 +17,7 @@ pub fn array_to_page( array: &FixedSizeBinaryArray, options: WriteOptions, descriptor: ColumnDescriptor, -) -> Result { +) -> Result { let is_optional = is_type_nullable(descriptor.type_()); let validity = array.validity(); @@ -45,19 +44,6 @@ pub fn array_to_page( buffer.extend_from_slice(array.values()); } - let uncompressed_page_size = buffer.len(); - - let codec = create_codec(&options.compression)?; - let buffer = if let Some(mut codec) = codec { - // todo: remove this allocation by extending `buffer` directly. - // needs refactoring `compress`'s API. - let mut tmp = vec![]; - codec.compress(&buffer, &mut tmp)?; - tmp - } else { - buffer - }; - let statistics = if options.write_statistics { build_statistics(array, descriptor.clone()) } else { @@ -68,7 +54,6 @@ pub fn array_to_page( buffer, array.len(), array.null_count(), - uncompressed_page_size, 0, definition_levels_byte_length, statistics, diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index b40fccac59e..e1a6aa6b5e9 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -12,8 +12,6 @@ mod utils; pub mod stream; -use std::sync::Arc; - use crate::array::*; use crate::bitmap::Bitmap; use crate::buffer::{Buffer, MutableBuffer}; @@ -24,13 +22,18 @@ use crate::io::parquet::write::levels::NestedInfo; use crate::types::days_ms; use crate::types::NativeType; +use parquet2::page::DataPage; pub use parquet2::{ compression::Compression, encoding::Encoding, metadata::{ColumnDescriptor, KeyValue, SchemaDescriptor}, - page::{CompressedDataPage, CompressedPage}, + page::{CompressedDataPage, CompressedPage, EncodedPage}, schema::types::ParquetType, - write::{write_file as parquet_write_file, DynIter, RowGroupIter, Version, WriteOptions}, + write::{ + write_file as parquet_write_file, Compressor, DynIter, DynStreamingIterator, RowGroupIter, + Version, WriteOptions, + }, + FallibleStreamingIterator, }; pub use record_batch::RowGroupIterator; use schema::schema_to_metadata_key; @@ -104,13 +107,13 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool { ) } -/// Returns an iterator of compressed pages, +/// Returns an iterator of [`EncodedPage`]. pub fn array_to_pages( - array: Arc, + array: &dyn Array, descriptor: ColumnDescriptor, options: WriteOptions, encoding: Encoding, -) -> Result>> { +) -> Result>> { match array.data_type() { DataType::Dictionary(key_type, _) => { with_match_dictionary_key_type!(key_type.as_ref(), |$T| { @@ -122,7 +125,7 @@ pub fn array_to_pages( ) }) } - _ => array_to_page(array.as_ref(), descriptor, options, encoding) + _ => array_to_page(array, descriptor, options, encoding) .map(|page| DynIter::new(std::iter::once(Ok(page)))), } } @@ -133,7 +136,7 @@ pub fn array_to_page( descriptor: ColumnDescriptor, options: WriteOptions, encoding: Encoding, -) -> Result { +) -> Result { let data_type = array.data_type(); if !can_encode(data_type, encoding) { return Err(ArrowError::InvalidArgumentError(format!( @@ -319,7 +322,7 @@ pub fn array_to_page( other ))), } - .map(CompressedPage::Data) + .map(EncodedPage::Data) } macro_rules! dyn_nested_prim { @@ -341,7 +344,7 @@ fn list_array_to_page( values: &dyn Array, descriptor: ColumnDescriptor, options: WriteOptions, -) -> Result { +) -> Result { use DataType::*; let is_optional = is_type_nullable(descriptor.type_()); let nested = NestedInfo::new(offsets, validity, is_optional); @@ -420,7 +423,7 @@ fn nested_array_to_page( array: &dyn Array, descriptor: ColumnDescriptor, options: WriteOptions, -) -> Result { +) -> Result { match array.data_type() { DataType::List(_) => { let array = array.as_any().downcast_ref::>().unwrap(); diff --git a/src/io/parquet/write/primitive/basic.rs b/src/io/parquet/write/primitive/basic.rs index a86f246c7b9..9b9deb16d0b 100644 --- a/src/io/parquet/write/primitive/basic.rs +++ b/src/io/parquet/write/primitive/basic.rs @@ -1,7 +1,7 @@ use parquet2::{ encoding::Encoding, metadata::ColumnDescriptor, - page::CompressedDataPage, + page::DataPage, statistics::{serialize_statistics, ParquetStatistics, PrimitiveStatistics, Statistics}, types::NativeType, write::WriteOptions, @@ -42,7 +42,7 @@ pub fn array_to_page( array: &PrimitiveArray, options: WriteOptions, descriptor: ColumnDescriptor, -) -> Result +) -> Result where T: ArrowNativeType, R: NativeType, @@ -65,10 +65,6 @@ where encode_plain(array, is_optional, &mut buffer); - let uncompressed_page_size = buffer.len(); - - let buffer = utils::compress(buffer, options, definition_levels_byte_length)?; - let statistics = if options.write_statistics { Some(build_statistics(array, descriptor.clone())) } else { @@ -79,7 +75,6 @@ where buffer, array.len(), array.null_count(), - uncompressed_page_size, 0, definition_levels_byte_length, statistics, diff --git a/src/io/parquet/write/primitive/nested.rs b/src/io/parquet/write/primitive/nested.rs index 43483a81499..5be103d08b9 100644 --- a/src/io/parquet/write/primitive/nested.rs +++ b/src/io/parquet/write/primitive/nested.rs @@ -1,5 +1,5 @@ use parquet2::{ - encoding::Encoding, metadata::ColumnDescriptor, page::CompressedDataPage, types::NativeType, + encoding::Encoding, metadata::ColumnDescriptor, page::DataPage, types::NativeType, write::WriteOptions, }; @@ -18,7 +18,7 @@ pub fn array_to_page( options: WriteOptions, descriptor: ColumnDescriptor, nested: levels::NestedInfo, -) -> Result +) -> Result where T: ArrowNativeType, R: NativeType, @@ -38,14 +38,6 @@ where encode_plain(array, is_optional, &mut buffer); - let uncompressed_page_size = buffer.len(); - - let buffer = utils::compress( - buffer, - options, - definition_levels_byte_length + repetition_levels_byte_length, - )?; - let statistics = if options.write_statistics { Some(build_statistics(array, descriptor.clone())) } else { @@ -56,7 +48,6 @@ where buffer, levels::num_values(nested.offsets()), array.null_count(), - uncompressed_page_size, repetition_levels_byte_length, definition_levels_byte_length, statistics, diff --git a/src/io/parquet/write/record_batch.rs b/src/io/parquet/write/record_batch.rs index 3af63a3d9be..b6cb940799c 100644 --- a/src/io/parquet/write/record_batch.rs +++ b/src/io/parquet/write/record_batch.rs @@ -1,6 +1,9 @@ +use parquet2::write::Compressor; +use parquet2::FallibleStreamingIterator; + use super::{ - array_to_pages, to_parquet_schema, DynIter, Encoding, RowGroupIter, SchemaDescriptor, - WriteOptions, + array_to_pages, to_parquet_schema, DynIter, DynStreamingIterator, Encoding, RowGroupIter, + SchemaDescriptor, WriteOptions, }; use crate::{ datatypes::Schema, @@ -59,8 +62,16 @@ impl>> Iterator for RowGroupIterator { .into_iter() .zip(self.parquet_schema.columns().to_vec().into_iter()) .zip(encodings.into_iter()) - .map(move |((array, type_), encoding)| { - array_to_pages(array, type_, options, encoding) + .map(move |((array, descriptor), encoding)| { + array_to_pages(array.as_ref(), descriptor, options, encoding).map( + move |pages| { + let encoded_pages = DynIter::new(pages.map(|x| Ok(x?))); + let compressed_pages = + Compressor::new(encoded_pages, options.compression, vec![]) + .map_err(ArrowError::from); + DynStreamingIterator::new(compressed_pages) + }, + ) }), )) }) diff --git a/src/io/parquet/write/utf8/basic.rs b/src/io/parquet/write/utf8/basic.rs index ab1c074f213..f1e8fd3d24c 100644 --- a/src/io/parquet/write/utf8/basic.rs +++ b/src/io/parquet/write/utf8/basic.rs @@ -1,7 +1,7 @@ use parquet2::{ encoding::Encoding, metadata::ColumnDescriptor, - page::CompressedDataPage, + page::DataPage, statistics::{serialize_statistics, BinaryStatistics, ParquetStatistics, Statistics}, write::WriteOptions, }; @@ -43,7 +43,7 @@ pub fn array_to_page( options: WriteOptions, descriptor: ColumnDescriptor, encoding: Encoding, -) -> Result { +) -> Result { let validity = array.validity(); let is_optional = is_type_nullable(descriptor.type_()); @@ -76,10 +76,6 @@ pub fn array_to_page( } } - let uncompressed_page_size = buffer.len(); - - let buffer = utils::compress(buffer, options, definition_levels_byte_length)?; - let statistics = if options.write_statistics { Some(build_statistics(array, descriptor.clone())) } else { @@ -90,7 +86,6 @@ pub fn array_to_page( buffer, array.len(), array.null_count(), - uncompressed_page_size, 0, definition_levels_byte_length, statistics, diff --git a/src/io/parquet/write/utf8/nested.rs b/src/io/parquet/write/utf8/nested.rs index d9b7dafbd0e..cb87fabf31f 100644 --- a/src/io/parquet/write/utf8/nested.rs +++ b/src/io/parquet/write/utf8/nested.rs @@ -1,5 +1,5 @@ use parquet2::{ - encoding::Encoding, metadata::ColumnDescriptor, page::CompressedDataPage, write::WriteOptions, + encoding::Encoding, metadata::ColumnDescriptor, page::DataPage, write::WriteOptions, }; use super::super::{levels, utils}; @@ -15,7 +15,7 @@ pub fn array_to_page( options: WriteOptions, descriptor: ColumnDescriptor, nested: levels::NestedInfo, -) -> Result +) -> Result where OO: Offset, O: Offset, @@ -33,14 +33,6 @@ where encode_plain(array, is_optional, &mut buffer); - let uncompressed_page_size = buffer.len(); - - let buffer = utils::compress( - buffer, - options, - definition_levels_byte_length + repetition_levels_byte_length, - )?; - let statistics = if options.write_statistics { Some(build_statistics(array, descriptor.clone())) } else { @@ -51,7 +43,6 @@ where buffer, levels::num_values(nested.offsets()), array.null_count(), - uncompressed_page_size, repetition_levels_byte_length, definition_levels_byte_length, statistics, diff --git a/src/io/parquet/write/utils.rs b/src/io/parquet/write/utils.rs index 8a96a3e6bff..6857bbc533f 100644 --- a/src/io/parquet/write/utils.rs +++ b/src/io/parquet/write/utils.rs @@ -1,10 +1,10 @@ use crate::bitmap::Bitmap; use parquet2::{ - compression::{create_codec, Compression}, + compression::Compression, encoding::{hybrid_rle::encode_bool, Encoding}, metadata::ColumnDescriptor, - page::{CompressedDataPage, DataPageHeader, DataPageHeaderV1, DataPageHeaderV2}, + page::{DataPage, DataPageHeader, DataPageHeaderV1, DataPageHeaderV2}, statistics::ParquetStatistics, write::WriteOptions, }; @@ -62,14 +62,13 @@ pub fn build_plain_page( buffer: Vec, len: usize, null_count: usize, - uncompressed_page_size: usize, repetition_levels_byte_length: usize, definition_levels_byte_length: usize, statistics: Option, descriptor: ColumnDescriptor, options: WriteOptions, encoding: Encoding, -) -> Result { +) -> Result { match options.version { Version::V1 => { let header = DataPageHeader::V1(DataPageHeaderV1 { @@ -80,14 +79,7 @@ pub fn build_plain_page( statistics, }); - Ok(CompressedDataPage::new( - header, - buffer, - options.compression, - uncompressed_page_size, - None, - descriptor, - )) + Ok(DataPage::new(header, buffer, None, descriptor)) } Version::V2 => { let header = DataPageHeader::V2(DataPageHeaderV2 { @@ -101,48 +93,11 @@ pub fn build_plain_page( statistics, }); - Ok(CompressedDataPage::new( - header, - buffer, - options.compression, - uncompressed_page_size, - None, - descriptor, - )) + Ok(DataPage::new(header, buffer, None, descriptor)) } } } -pub fn compress( - mut buffer: Vec, - options: WriteOptions, - levels_byte_length: usize, -) -> Result> { - let codec = create_codec(&options.compression)?; - Ok(if let Some(mut codec) = codec { - match options.version { - Version::V1 => { - // todo: remove this allocation by extending `buffer` directly. - // needs refactoring `compress`'s API. - let mut tmp = vec![]; - codec.compress(&buffer, &mut tmp)?; - tmp - } - Version::V2 => { - // todo: remove this allocation by extending `buffer` directly. - // needs refactoring `compress`'s API. - let mut tmp = vec![]; - codec.compress(&buffer[levels_byte_length..], &mut tmp)?; - buffer.truncate(levels_byte_length); - buffer.extend_from_slice(&tmp); - buffer - } - } - } else { - buffer - }) -} - /// Auxiliary iterator adapter to declare the size hint of an iterator. pub(super) struct ExactSizedIter> { iter: I, diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 97a2445a0ba..e82135fd812 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -1,6 +1,7 @@ use std::io::{Cursor, Read, Seek}; use std::sync::Arc; +use arrow2::error::ArrowError; use arrow2::{ array::*, bitmap::Bitmap, buffer::Buffer, datatypes::*, error::Result, io::parquet::read::statistics::*, io::parquet::read::*, io::parquet::write::*, @@ -492,13 +493,19 @@ fn integration_write(schema: &Schema, batches: &[RecordBatch]) -> Result .columns() .iter() .zip(descritors.clone()) - .map(|(array, type_)| { + .map(|(array, descriptor)| { let encoding = if let DataType::Dictionary(_, _) = array.data_type() { Encoding::RleDictionary } else { Encoding::Plain }; - array_to_pages(array.clone(), type_, options, encoding) + array_to_pages(array.as_ref(), descriptor, options, encoding).map(|pages| { + let encoded_pages = DynIter::new(pages.map(|x| Ok(x?))); + let compressed_pages = + Compressor::new(encoded_pages, options.compression, vec![]) + .map_err(ArrowError::from); + DynStreamingIterator::new(compressed_pages) + }) }); let iterator = DynIter::new(iterator); Ok(iterator)