diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 4cc7294f675d..1d56960cf162 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -31,7 +31,7 @@ use crate::arrow::schema::parquet_to_arrow_schema_by_columns; use crate::arrow::ProjectionMask; use crate::errors::Result; use crate::file::metadata::{KeyValue, ParquetMetaData}; -use crate::file::reader::FileReader; +use crate::file::reader::{ChunkReader, FileReader, SerializedFileReader}; use crate::schema::types::SchemaDescriptor; /// Arrow reader api. @@ -145,15 +145,40 @@ impl ArrowReader for ParquetFileArrowReader { } impl ParquetFileArrowReader { - /// Create a new [`ParquetFileArrowReader`] + /// Create a new [`ParquetFileArrowReader`] with the provided [`ChunkReader`] + /// + /// ```no_run + /// # use std::fs::File; + /// # use bytes::Bytes; + /// # use parquet::arrow::ParquetFileArrowReader; + /// + /// let file = File::open("file.parquet").unwrap(); + /// let reader = ParquetFileArrowReader::try_new(file).unwrap(); + /// + /// let bytes = Bytes::from(vec![]); + /// let reader = ParquetFileArrowReader::try_new(bytes).unwrap(); + /// ``` + pub fn try_new(chunk_reader: R) -> Result { + Self::try_new_with_options(chunk_reader, Default::default()) + } + + /// Create a new [`ParquetFileArrowReader`] with the provided [`ChunkReader`] + /// and [`ArrowReaderOptions`] + pub fn try_new_with_options( + chunk_reader: R, + options: ArrowReaderOptions, + ) -> Result { + let file_reader = Arc::new(SerializedFileReader::new(chunk_reader)?); + Ok(Self::new_with_options(file_reader, options)) + } + + /// Create a new [`ParquetFileArrowReader`] with the provided [`Arc`] pub fn new(file_reader: Arc) -> Self { - Self { - file_reader, - options: Default::default(), - } + Self::new_with_options(file_reader, Default::default()) } - /// Create a new [`ParquetFileArrowReader`] with the provided [`ArrowReaderOptions`] + /// Create a new [`ParquetFileArrowReader`] with the provided [`Arc`] + /// and [`ArrowReaderOptions`] pub fn new_with_options( file_reader: Arc, options: ArrowReaderOptions, @@ -369,8 +394,7 @@ mod tests { file.rewind().unwrap(); - let parquet_reader = SerializedFileReader::try_from(file).unwrap(); - let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader)); + let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap(); let record_reader = arrow_reader.get_record_reader(2).unwrap(); let batches = record_reader.collect::>>().unwrap(); @@ -601,9 +625,8 @@ mod tests { let file_variants = vec![("fixed_length", 25), ("int32", 4), ("int64", 10)]; for (prefix, target_precision) in file_variants { let path = format!("{}/{}_decimal.parquet", testdata, prefix); - let parquet_reader = - SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap(); - let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader)); + let file = File::open(&path).unwrap(); + let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap(); let mut record_reader = arrow_reader.get_record_reader(32).unwrap(); @@ -871,9 +894,7 @@ mod tests { file.rewind().unwrap(); - let parquet_reader = SerializedFileReader::try_from(file).unwrap(); - let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader)); - + let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap(); let mut record_reader = arrow_reader .get_record_reader(opts.record_batch_size) .unwrap(); @@ -1022,11 +1043,7 @@ mod tests { let testdata = arrow::util::test_util::parquet_test_data(); let path = format!("{}/nested_structs.rust.parquet", testdata); let file = File::open(&path).unwrap(); - let parquet_file_reader = SerializedFileReader::try_from(file).unwrap(); - let file_metadata = parquet_file_reader.metadata().file_metadata(); - let schema = file_metadata.schema_descr_ptr(); - - let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_file_reader)); + let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap(); let record_batch_reader = arrow_reader .get_record_reader(60) .expect("Failed to read into array!"); @@ -1035,7 +1052,7 @@ mod tests { batch.unwrap(); } - let mask = ProjectionMask::leaves(&schema, [3, 8, 10]); + let mask = ProjectionMask::leaves(arrow_reader.parquet_schema(), [3, 8, 10]); let projected_reader = arrow_reader .get_record_reader_by_columns(mask.clone(), 60) .unwrap(); @@ -1075,9 +1092,8 @@ mod tests { fn test_read_maps() { let testdata = arrow::util::test_util::parquet_test_data(); let path = format!("{}/nested_maps.snappy.parquet", testdata); - let parquet_file_reader = - SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap(); - let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_file_reader)); + let file = File::open(&path).unwrap(); + let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap(); let record_batch_reader = arrow_reader .get_record_reader(60) .expect("Failed to read into array!"); @@ -1124,14 +1140,12 @@ mod tests { writer.close().unwrap(); } - let file_reader = Arc::new(SerializedFileReader::new(file).unwrap()); - let file_metadata = file_reader.metadata().file_metadata(); - let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]); + let mut reader = ParquetFileArrowReader::try_new(file).unwrap(); + let mask = ProjectionMask::leaves(reader.parquet_schema(), [0]); - let mut batch = ParquetFileArrowReader::new(file_reader); - let reader = batch.get_record_reader_by_columns(mask, 1024).unwrap(); + let reader = reader.get_record_reader_by_columns(mask, 1024).unwrap(); - let expected_schema = arrow::datatypes::Schema::new(vec![Field::new( + let expected_schema = Schema::new(vec![Field::new( "group", ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)]), true, @@ -1163,9 +1177,7 @@ mod tests { ]; let file = Bytes::from(data); - let file_reader = SerializedFileReader::new(file).unwrap(); - let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); - + let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap(); let mut record_batch_reader = arrow_reader .get_record_reader_by_columns(ProjectionMask::all(), 10) .unwrap(); @@ -1241,8 +1253,7 @@ mod tests { file.rewind().unwrap(); - let parquet_reader = SerializedFileReader::try_from(file).unwrap(); - let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader)); + let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap(); let record_reader = arrow_reader.get_record_reader(3).unwrap(); @@ -1280,9 +1291,8 @@ mod tests { fn test_read_null_list() { let testdata = arrow::util::test_util::parquet_test_data(); let path = format!("{}/null_list.parquet", testdata); - let parquet_file_reader = - SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap(); - let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_file_reader)); + let file = File::open(&path).unwrap(); + let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap(); let mut record_batch_reader = arrow_reader .get_record_reader(60) .expect("Failed to read into array!"); @@ -1402,12 +1412,12 @@ mod tests { let testdata = arrow::util::test_util::parquet_test_data(); let path = format!("{}/alltypes_plain.parquet", testdata); let file = File::open(&path).unwrap(); - let reader = SerializedFileReader::try_from(file).unwrap(); - let file_metadata = reader.metadata().file_metadata(); + + let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap(); + let file_metadata = arrow_reader.metadata().file_metadata(); let expected_rows = file_metadata.num_rows() as usize; let schema = file_metadata.schema_descr_ptr(); - let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader)); let mask = ProjectionMask::leaves(&schema, []); let batch_reader = arrow_reader.get_record_reader_by_columns(mask, 2).unwrap(); diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs index ceeddfef5d18..b02a916da8b3 100644 --- a/parquet/src/arrow/arrow_writer.rs +++ b/parquet/src/arrow/arrow_writer.rs @@ -752,8 +752,7 @@ mod tests { } let cursor = Bytes::from(buffer); - let reader = SerializedFileReader::new(cursor).unwrap(); - let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader)); + let mut arrow_reader = ParquetFileArrowReader::try_new(cursor).unwrap(); let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap(); let actual_batch = record_batch_reader @@ -1188,8 +1187,8 @@ mod tests { writer.write(&expected_batch).unwrap(); writer.close().unwrap(); - let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap(); - let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader)); + let mut arrow_reader = + ParquetFileArrowReader::try_new(file.try_clone().unwrap()).unwrap(); let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap(); let actual_batch = record_batch_reader @@ -1918,10 +1917,9 @@ mod tests { writer.close().unwrap(); - let reader = SerializedFileReader::new(file).unwrap(); - assert_eq!(&row_group_sizes(reader.metadata()), &[200, 200, 50]); + let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap(); + assert_eq!(&row_group_sizes(arrow_reader.metadata()), &[200, 200, 50]); - let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader)); let batches = arrow_reader .get_record_reader(100) .unwrap() @@ -2061,13 +2059,12 @@ mod tests { writer.close().unwrap(); // Read Data - let reader = SerializedFileReader::new(file).unwrap(); - // Should have written entire first batch and first row of second to the first row group // leaving a single row in the second row group - assert_eq!(&row_group_sizes(reader.metadata()), &[6, 1]); - let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader)); + let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap(); + assert_eq!(&row_group_sizes(arrow_reader.metadata()), &[6, 1]); + let batches = arrow_reader .get_record_reader(2) .unwrap() diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 5a5135cd34d7..c9cc0ff6ce38 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -96,12 +96,9 @@ //! # writer.close().unwrap(); //! //! let file = File::open("data.parquet").unwrap(); -//! let file_reader = SerializedFileReader::new(file).unwrap(); //! -//! let file_metadata = file_reader.metadata().file_metadata(); -//! let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]); -//! -//! let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); +//! let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap(); +//! let mask = ProjectionMask::leaves(arrow_reader.parquet_schema(), [0]); //! //! println!("Converted arrow schema is: {}", arrow_reader.get_schema().unwrap()); //! println!("Arrow schema after projection is: {}", diff --git a/parquet/src/arrow/schema.rs b/parquet/src/arrow/schema.rs index 5416e4078538..f3d0a3d9b36b 100644 --- a/parquet/src/arrow/schema.rs +++ b/parquet/src/arrow/schema.rs @@ -478,11 +478,11 @@ fn arrow_to_parquet_type(field: &Field) -> Result { mod tests { use super::*; - use std::{collections::HashMap, convert::TryFrom, sync::Arc}; + use std::{collections::HashMap, sync::Arc}; use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit}; - use crate::file::{metadata::KeyValue, reader::SerializedFileReader}; + use crate::file::metadata::KeyValue; use crate::{ arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader}, schema::{parser::parse_message_type, types::SchemaDescriptor}, @@ -571,9 +571,12 @@ mod tests { ]; assert_eq!(&arrow_fields, converted_arrow_schema.fields()); - let converted_arrow_schema = - parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None) - .unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema_by_columns( + &parquet_schema, + ProjectionMask::all(), + None, + ) + .unwrap(); assert_eq!(&arrow_fields, converted_arrow_schema.fields()); } @@ -1599,13 +1602,13 @@ mod tests { writer.close()?; // read file back - let parquet_reader = SerializedFileReader::try_from(file)?; - let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader)); + let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap(); let read_schema = arrow_reader.get_schema()?; assert_eq!(schema, read_schema); // read all fields by columns - let partial_read_schema = arrow_reader.get_schema_by_columns(ProjectionMask::all())?; + let partial_read_schema = + arrow_reader.get_schema_by_columns(ProjectionMask::all())?; assert_eq!(schema, partial_read_schema); Ok(()) @@ -1668,13 +1671,13 @@ mod tests { writer.close()?; // read file back - let parquet_reader = SerializedFileReader::try_from(file)?; - let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader)); + let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap(); let read_schema = arrow_reader.get_schema()?; assert_eq!(schema, read_schema); // read all fields by columns - let partial_read_schema = arrow_reader.get_schema_by_columns(ProjectionMask::all())?; + let partial_read_schema = + arrow_reader.get_schema_by_columns(ProjectionMask::all())?; assert_eq!(schema, partial_read_schema); Ok(())