Skip to content

Commit

Permalink
Adds option for providing a schema to the Arrow Parquet Reader.
Browse files Browse the repository at this point in the history
  • Loading branch information
Eric Fredine committed Jun 22, 2024
1 parent 20a569a commit d310f92
Showing 1 changed file with 194 additions and 24 deletions.
218 changes: 194 additions & 24 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,24 @@ use arrow_array::Array;
use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
use arrow_select::filter::prep_null_mask_filter;
pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
pub use selection::{RowSelection, RowSelector};

pub use crate::arrow::array_reader::RowGroups;
use crate::arrow::array_reader::{build_array_reader, ArrayReader};
use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
use crate::arrow::{FieldLevels, ProjectionMask};
use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
use crate::column::page::{PageIterator, PageReader};
use crate::errors::{ParquetError, Result};
use crate::file::footer;
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index_reader;
use crate::file::reader::{ChunkReader, SerializedPageReader};
use crate::schema::types::SchemaDescriptor;

mod filter;
mod selection;

pub use crate::arrow::array_reader::RowGroups;
use crate::column::page::{PageIterator, PageReader};
use crate::file::footer;
use crate::file::page_index::index_reader;
pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
pub use selection::{RowSelection, RowSelector};

/// Builder for constructing parquet readers into arrow.
///
/// Most users should use one of the following specializations:
Expand Down Expand Up @@ -250,6 +249,8 @@ impl<T> ArrowReaderBuilder<T> {
pub struct ArrowReaderOptions {
/// Should the reader strip any user defined metadata from the Arrow schema
skip_arrow_metadata: bool,
/// If provided used as the schema for the file, otherwise the schema is read from the file
supplied_schema: Option<SchemaRef>,
/// If true, attempt to read `OffsetIndex` and `ColumnIndex`
pub(crate) page_index: bool,
}
Expand All @@ -273,6 +274,23 @@ impl ArrowReaderOptions {
}
}

/// Provide a schema to use when reading the parquet file. If provided it
/// takes precedence over the schema inferred from the file or the schema defined
/// in the file's metadata. If the schema is not compatible with the file's
/// schema an error will be returned when constructing the builder.
///
/// This option is only required if you want to cast columns to a different type.
/// For example, if you wanted to cast from an Int64 in the Parquet file to a Timestamp
/// in the Arrow schema.
///
pub fn with_schema(self, schema: SchemaRef) -> Self {
Self {
supplied_schema: Some(schema),
skip_arrow_metadata: true,
..self
}
}

/// Enable reading [`PageIndex`], if present (defaults to `false`)
///
/// The `PageIndex` can be used to push down predicates to the parquet scan,
Expand Down Expand Up @@ -353,22 +371,46 @@ impl ArrowReaderMetadata {
/// This function does not attempt to load the PageIndex if not present in the metadata.
/// See [`Self::load`] for more details.
pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
let kv_metadata = match options.skip_arrow_metadata {
true => None,
false => metadata.file_metadata().key_value_metadata(),
};

let (schema, fields) = parquet_to_arrow_schema_and_fields(
metadata.file_metadata().schema_descr(),
ProjectionMask::all(),
kv_metadata,
)?;
match options.supplied_schema {
Some(supplied_schema) => {
let parquet_schema = metadata.file_metadata().schema_descr();
let field_levels = parquet_to_arrow_field_levels(
parquet_schema,
ProjectionMask::all(),
Some(supplied_schema.fields()),
)?;
let inferred_schema = Schema::new(field_levels.fields);
if supplied_schema.contains(&inferred_schema) {
Ok(Self {
metadata,
schema: Arc::from(inferred_schema),
fields: field_levels.levels.map(Arc::new),
})
} else {
Err(ParquetError::ArrowError(
"supplied schema does not match the parquet schema".into(),
))
}
}
None => {
let kv_metadata = match options.skip_arrow_metadata {
true => None,
false => metadata.file_metadata().key_value_metadata(),
};

Ok(Self {
metadata,
schema: Arc::new(schema),
fields: fields.map(Arc::new),
})
let (schema, fields) = parquet_to_arrow_schema_and_fields(
metadata.file_metadata().schema_descr(),
ProjectionMask::all(),
kv_metadata,
)?;

Ok(Self {
metadata,
schema: Arc::new(schema),
fields: fields.map(Arc::new),
})
}
}
}

/// Returns a reference to the [`ParquetMetaData`] for this parquet file
Expand Down Expand Up @@ -842,7 +884,7 @@ mod tests {
use arrow_array::*;
use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
use arrow_data::ArrayDataBuilder;
use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, Fields, Schema};
use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef};
use arrow_select::concat::concat_batches;

use crate::arrow::arrow_reader::{
Expand Down Expand Up @@ -2307,10 +2349,12 @@ mod tests {
fn test_invalid_utf8_string_array() {
test_invalid_utf8_string_array_inner::<i32>();
}

#[test]
fn test_invalid_utf8_large_string_array() {
test_invalid_utf8_string_array_inner::<i64>();
}

fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
let cases = [
(
Expand Down Expand Up @@ -2620,6 +2664,132 @@ mod tests {
assert_eq!(reader.schema(), schema_without_metadata);
}

fn gen_parquet_file<T>(values: &[T::T], file: File) -> Result<crate::format::FileMetaData>
where
T: DataType,
{
let len = match T::get_physical_type() {
crate::basic::Type::INT96 => 12,
_ => -1,
};

let fields = vec![Arc::new(
Type::primitive_type_builder("leaf", T::get_physical_type())
.with_repetition(Repetition::REQUIRED)
.with_converted_type(ConvertedType::NONE)
.with_length(len)
.build()
.unwrap(),
)];

let schema = Arc::new(
Type::group_type_builder("test_schema")
.with_fields(fields)
.build()
.unwrap(),
);
let mut writer = SerializedFileWriter::new(file, schema, Default::default())?;

let mut row_group_writer = writer.next_row_group()?;
let mut column_writer = row_group_writer.next_column()?.unwrap();

column_writer.typed::<T>().write_batch(values, None, None)?;

column_writer.close()?;
row_group_writer.close()?;
writer.close()
}

fn get_builder_with_schema(
file: File,
schema: SchemaRef,
) -> Result<ParquetRecordBatchReaderBuilder<File>> {
let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
ParquetRecordBatchReaderBuilder::try_new_with_options(
file.try_clone().unwrap(),
options_with_schema,
)
}

#[test]
fn test_with_schema_int64_to_timestamp() {
let values: Vec<i64> = vec![0];
let file = tempfile().unwrap();
gen_parquet_file::<Int64Type>(&values, file.try_clone().unwrap()).unwrap();
let supplied_schema = Arc::new(Schema::new(vec![Field::new(
"leaf",
ArrowDataType::Timestamp(
arrow::datatypes::TimeUnit::Nanosecond,
Some("+01:00".into()),
),
false,
)]));
let builder =
get_builder_with_schema(file.try_clone().unwrap(), supplied_schema.clone()).unwrap();

let mut arrow_reader = builder.build().unwrap();

assert_eq!(arrow_reader.schema(), supplied_schema);

let batch = arrow_reader.next().unwrap().unwrap();
assert_eq!(batch.num_columns(), 1);
assert_eq!(batch.num_rows(), 1);
assert_eq!(
batch
.column(0)
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
.map(|v| v.to_string())
.unwrap(),
"1970-01-01 01:00:00 +01:00"
);
}

#[test]
fn test_schema_too_many_columns() {
let values: Vec<i64> = vec![0];
let file = tempfile().unwrap();
gen_parquet_file::<Int64Type>(&values, file.try_clone().unwrap()).unwrap();
let supplied_schema = Arc::new(Schema::new(vec![
Field::new("leaf", ArrowDataType::Int64, false),
Field::new("extra", ArrowDataType::Int32, true),
]));
let options_with_schema = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
file.try_clone().unwrap(),
options_with_schema,
);

assert_eq!(
builder.err().unwrap().to_string(),
"Arrow: incompatible arrow schema, expected 1 struct fields got 2"
);
}

#[test]
fn test_schema_incompatible_column() {
let values: Vec<i64> = vec![0];
let file = tempfile().unwrap();
gen_parquet_file::<Int64Type>(&values, file.try_clone().unwrap()).unwrap();
let supplied_schema = Arc::new(Schema::new(vec![Field::new(
"leaf",
ArrowDataType::Int32,
false,
)]));
let options_with_schema = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
file.try_clone().unwrap(),
options_with_schema,
);

assert_eq!(
builder.err().unwrap().to_string(),
"Arrow: supplied schema does not match the parquet schema"
);
}

#[test]
fn test_empty_projection() {
let testdata = arrow::util::test_util::parquet_test_data();
Expand Down

0 comments on commit d310f92

Please sign in to comment.