From d310f92d11f6559ff5c961c594137273875f4dc2 Mon Sep 17 00:00:00 2001 From: Eric Fredine Date: Thu, 20 Jun 2024 16:32:00 -0700 Subject: [PATCH] Adds option for providing a schema to the Arrow Parquet Reader. --- parquet/src/arrow/arrow_reader/mod.rs | 218 +++++++++++++++++++++++--- 1 file changed, 194 insertions(+), 24 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 070dda97c59a..95012d40e8f6 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -25,25 +25,24 @@ use arrow_array::Array; use arrow_array::{RecordBatch, RecordBatchReader}; use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef}; use arrow_select::filter::prep_null_mask_filter; +pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; +pub use selection::{RowSelection, RowSelector}; +pub use crate::arrow::array_reader::RowGroups; use crate::arrow::array_reader::{build_array_reader, ArrayReader}; use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField}; -use crate::arrow::{FieldLevels, ProjectionMask}; +use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask}; +use crate::column::page::{PageIterator, PageReader}; use crate::errors::{ParquetError, Result}; +use crate::file::footer; use crate::file::metadata::ParquetMetaData; +use crate::file::page_index::index_reader; use crate::file::reader::{ChunkReader, SerializedPageReader}; use crate::schema::types::SchemaDescriptor; mod filter; mod selection; -pub use crate::arrow::array_reader::RowGroups; -use crate::column::page::{PageIterator, PageReader}; -use crate::file::footer; -use crate::file::page_index::index_reader; -pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; -pub use selection::{RowSelection, RowSelector}; - /// Builder for constructing parquet readers into arrow. /// /// Most users should use one of the following specializations: @@ -250,6 +249,8 @@ impl ArrowReaderBuilder { pub struct ArrowReaderOptions { /// Should the reader strip any user defined metadata from the Arrow schema skip_arrow_metadata: bool, + /// If provided used as the schema for the file, otherwise the schema is read from the file + supplied_schema: Option, /// If true, attempt to read `OffsetIndex` and `ColumnIndex` pub(crate) page_index: bool, } @@ -273,6 +274,23 @@ impl ArrowReaderOptions { } } + /// Provide a schema to use when reading the parquet file. If provided it + /// takes precedence over the schema inferred from the file or the schema defined + /// in the file's metadata. If the schema is not compatible with the file's + /// schema an error will be returned when constructing the builder. + /// + /// This option is only required if you want to cast columns to a different type. + /// For example, if you wanted to cast from an Int64 in the Parquet file to a Timestamp + /// in the Arrow schema. + /// + pub fn with_schema(self, schema: SchemaRef) -> Self { + Self { + supplied_schema: Some(schema), + skip_arrow_metadata: true, + ..self + } + } + /// Enable reading [`PageIndex`], if present (defaults to `false`) /// /// The `PageIndex` can be used to push down predicates to the parquet scan, @@ -353,22 +371,46 @@ impl ArrowReaderMetadata { /// This function does not attempt to load the PageIndex if not present in the metadata. /// See [`Self::load`] for more details. pub fn try_new(metadata: Arc, options: ArrowReaderOptions) -> Result { - let kv_metadata = match options.skip_arrow_metadata { - true => None, - false => metadata.file_metadata().key_value_metadata(), - }; - - let (schema, fields) = parquet_to_arrow_schema_and_fields( - metadata.file_metadata().schema_descr(), - ProjectionMask::all(), - kv_metadata, - )?; + match options.supplied_schema { + Some(supplied_schema) => { + let parquet_schema = metadata.file_metadata().schema_descr(); + let field_levels = parquet_to_arrow_field_levels( + parquet_schema, + ProjectionMask::all(), + Some(supplied_schema.fields()), + )?; + let inferred_schema = Schema::new(field_levels.fields); + if supplied_schema.contains(&inferred_schema) { + Ok(Self { + metadata, + schema: Arc::from(inferred_schema), + fields: field_levels.levels.map(Arc::new), + }) + } else { + Err(ParquetError::ArrowError( + "supplied schema does not match the parquet schema".into(), + )) + } + } + None => { + let kv_metadata = match options.skip_arrow_metadata { + true => None, + false => metadata.file_metadata().key_value_metadata(), + }; - Ok(Self { - metadata, - schema: Arc::new(schema), - fields: fields.map(Arc::new), - }) + let (schema, fields) = parquet_to_arrow_schema_and_fields( + metadata.file_metadata().schema_descr(), + ProjectionMask::all(), + kv_metadata, + )?; + + Ok(Self { + metadata, + schema: Arc::new(schema), + fields: fields.map(Arc::new), + }) + } + } } /// Returns a reference to the [`ParquetMetaData`] for this parquet file @@ -842,7 +884,7 @@ mod tests { use arrow_array::*; use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime}; use arrow_data::ArrayDataBuilder; - use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, Fields, Schema}; + use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef}; use arrow_select::concat::concat_batches; use crate::arrow::arrow_reader::{ @@ -2307,10 +2349,12 @@ mod tests { fn test_invalid_utf8_string_array() { test_invalid_utf8_string_array_inner::(); } + #[test] fn test_invalid_utf8_large_string_array() { test_invalid_utf8_string_array_inner::(); } + fn test_invalid_utf8_string_array_inner() { let cases = [ ( @@ -2620,6 +2664,132 @@ mod tests { assert_eq!(reader.schema(), schema_without_metadata); } + fn gen_parquet_file(values: &[T::T], file: File) -> Result + where + T: DataType, + { + let len = match T::get_physical_type() { + crate::basic::Type::INT96 => 12, + _ => -1, + }; + + let fields = vec![Arc::new( + Type::primitive_type_builder("leaf", T::get_physical_type()) + .with_repetition(Repetition::REQUIRED) + .with_converted_type(ConvertedType::NONE) + .with_length(len) + .build() + .unwrap(), + )]; + + let schema = Arc::new( + Type::group_type_builder("test_schema") + .with_fields(fields) + .build() + .unwrap(), + ); + let mut writer = SerializedFileWriter::new(file, schema, Default::default())?; + + let mut row_group_writer = writer.next_row_group()?; + let mut column_writer = row_group_writer.next_column()?.unwrap(); + + column_writer.typed::().write_batch(values, None, None)?; + + column_writer.close()?; + row_group_writer.close()?; + writer.close() + } + + fn get_builder_with_schema( + file: File, + schema: SchemaRef, + ) -> Result> { + let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone()); + ParquetRecordBatchReaderBuilder::try_new_with_options( + file.try_clone().unwrap(), + options_with_schema, + ) + } + + #[test] + fn test_with_schema_int64_to_timestamp() { + let values: Vec = vec![0]; + let file = tempfile().unwrap(); + gen_parquet_file::(&values, file.try_clone().unwrap()).unwrap(); + let supplied_schema = Arc::new(Schema::new(vec![Field::new( + "leaf", + ArrowDataType::Timestamp( + arrow::datatypes::TimeUnit::Nanosecond, + Some("+01:00".into()), + ), + false, + )])); + let builder = + get_builder_with_schema(file.try_clone().unwrap(), supplied_schema.clone()).unwrap(); + + let mut arrow_reader = builder.build().unwrap(); + + assert_eq!(arrow_reader.schema(), supplied_schema); + + let batch = arrow_reader.next().unwrap().unwrap(); + assert_eq!(batch.num_columns(), 1); + assert_eq!(batch.num_rows(), 1); + assert_eq!( + batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .value_as_datetime_with_tz(0, "+01:00".parse().unwrap()) + .map(|v| v.to_string()) + .unwrap(), + "1970-01-01 01:00:00 +01:00" + ); + } + + #[test] + fn test_schema_too_many_columns() { + let values: Vec = vec![0]; + let file = tempfile().unwrap(); + gen_parquet_file::(&values, file.try_clone().unwrap()).unwrap(); + let supplied_schema = Arc::new(Schema::new(vec![ + Field::new("leaf", ArrowDataType::Int64, false), + Field::new("extra", ArrowDataType::Int32, true), + ])); + let options_with_schema = ArrowReaderOptions::new().with_schema(supplied_schema.clone()); + let builder = ParquetRecordBatchReaderBuilder::try_new_with_options( + file.try_clone().unwrap(), + options_with_schema, + ); + + assert_eq!( + builder.err().unwrap().to_string(), + "Arrow: incompatible arrow schema, expected 1 struct fields got 2" + ); + } + + #[test] + fn test_schema_incompatible_column() { + let values: Vec = vec![0]; + let file = tempfile().unwrap(); + gen_parquet_file::(&values, file.try_clone().unwrap()).unwrap(); + let supplied_schema = Arc::new(Schema::new(vec![Field::new( + "leaf", + ArrowDataType::Int32, + false, + )])); + let options_with_schema = ArrowReaderOptions::new().with_schema(supplied_schema.clone()); + let builder = ParquetRecordBatchReaderBuilder::try_new_with_options( + file.try_clone().unwrap(), + options_with_schema, + ); + + assert_eq!( + builder.err().unwrap().to_string(), + "Arrow: supplied schema does not match the parquet schema" + ); + } + #[test] fn test_empty_projection() { let testdata = arrow::util::test_util::parquet_test_data();