Skip to content

Commit

Permalink
Parquet: clear metadata and project fields of ParquetRecordBatchStrea…
Browse files Browse the repository at this point in the history
…m::schema (#5135)

* Parquet: clear metadata of ParquetRecordBatchStream::schema

* Revert "Parquet: clear metadata of ParquetRecordBatchStream::schema"

This reverts commit 84be336.

* Document expected behaviour

* Revert "Document expected behaviour"

This reverts commit ef9601e.

* Reapply "Parquet: clear metadata of ParquetRecordBatchStream::schema"

This reverts commit fd662ad.

* ParquetRecordBatchStream should strip schema metadata and respect projection

* Fix projection of nested fields
  • Loading branch information
Jefffrey authored Dec 5, 2023
1 parent f16d2f5 commit a36bf7a
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 5 deletions.
4 changes: 4 additions & 0 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,10 @@ impl Iterator for ParquetRecordBatchReader {
}

impl RecordBatchReader for ParquetRecordBatchReader {
/// Returns the projected [`SchemaRef`] for reading the parquet file.
///
/// Note that the schema metadata will be stripped here. See
/// [`ParquetRecordBatchReaderBuilder::schema`] if the metadata is desired.
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
Expand Down
136 changes: 131 additions & 5 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ use futures::stream::Stream;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};

use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use arrow_schema::{DataType, Fields, Schema, SchemaRef};

use crate::arrow::array_reader::{build_array_reader, RowGroups};
use crate::arrow::arrow_reader::{
Expand Down Expand Up @@ -385,13 +385,24 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
offset: self.offset,
};

// Ensure schema of ParquetRecordBatchStream respects projection, and does
// not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches)
let projected_fields = match reader.fields.as_deref().map(|pf| &pf.arrow_type) {
Some(DataType::Struct(fields)) => {
fields.filter_leaves(|idx, _| self.projection.leaf_included(idx))
}
None => Fields::empty(),
_ => unreachable!("Must be Struct for root type"),
};
let schema = Arc::new(Schema::new(projected_fields));

Ok(ParquetRecordBatchStream {
metadata: self.metadata,
batch_size,
row_groups,
projection: self.projection,
selection: self.selection,
schema: self.schema,
schema,
reader: Some(reader),
state: StreamState::Init,
})
Expand Down Expand Up @@ -572,7 +583,10 @@ impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
}

impl<T> ParquetRecordBatchStream<T> {
/// Returns the [`SchemaRef`] for this parquet file
/// Returns the projected [`SchemaRef`] for reading the parquet file.
///
/// Note that the schema metadata will be stripped here. See
/// [`ParquetRecordBatchStreamBuilder::schema`] if the metadata is desired.
pub fn schema(&self) -> &SchemaRef {
&self.schema
}
Expand Down Expand Up @@ -855,11 +869,15 @@ mod tests {
use arrow_array::builder::{ListBuilder, StringBuilder};
use arrow_array::cast::AsArray;
use arrow_array::types::Int32Type;
use arrow_array::{Array, ArrayRef, Int32Array, Int8Array, Scalar, StringArray, UInt64Array};
use arrow_array::{
Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray,
StructArray, UInt64Array,
};
use arrow_schema::{DataType, Field, Schema};
use futures::{StreamExt, TryStreamExt};
use rand::{thread_rng, Rng};
use std::sync::Mutex;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tempfile::tempfile;

#[derive(Clone)]
Expand Down Expand Up @@ -1584,6 +1602,114 @@ mod tests {
test_get_row_group_column_bloom_filter(data, false).await;
}

#[tokio::test]
async fn test_parquet_record_batch_stream_schema() {
fn get_all_field_names(schema: &Schema) -> Vec<&String> {
schema.all_fields().iter().map(|f| f.name()).collect()
}

// ParquetRecordBatchReaderBuilder::schema differs from
// ParquetRecordBatchReader::schema and RecordBatch::schema in the returned
// schema contents (in terms of custom metadata attached to schema, and fields
// returned). Test to ensure this remains consistent behaviour.
//
// Ensure same for asynchronous versions of the above.

// Prep data, for a schema with nested fields, with custom metadata
let mut metadata = HashMap::with_capacity(1);
metadata.insert("key".to_string(), "value".to_string());

let nested_struct_array = StructArray::from(vec![
(
Arc::new(Field::new("d", DataType::Utf8, true)),
Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
),
(
Arc::new(Field::new("e", DataType::Utf8, true)),
Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
),
]);
let struct_array = StructArray::from(vec![
(
Arc::new(Field::new("a", DataType::Int32, true)),
Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
),
(
Arc::new(Field::new("b", DataType::UInt64, true)),
Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
),
(
Arc::new(Field::new(
"c",
nested_struct_array.data_type().clone(),
true,
)),
Arc::new(nested_struct_array) as ArrayRef,
),
]);

let schema =
Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
let record_batch = RecordBatch::from(struct_array)
.with_schema(schema.clone())
.unwrap();

// Write parquet with custom metadata in schema
let mut file = tempfile().unwrap();
let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
writer.write(&record_batch).unwrap();
writer.close().unwrap();

let all_fields = ["a", "b", "c", "d", "e"];
// (leaf indices in mask, expected names in output schema all fields)
let projections = [
(vec![], vec![]),
(vec![0], vec!["a"]),
(vec![0, 1], vec!["a", "b"]),
(vec![0, 1, 2], vec!["a", "b", "c", "d"]),
(vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
];

// Ensure we're consistent for each of these projections
for (indices, expected_projected_names) in projections {
let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
// Builder schema should preserve all fields and metadata
assert_eq!(get_all_field_names(&builder), all_fields);
assert_eq!(builder.metadata, metadata);
// Reader & batch schema should show only projected fields, and no metadata
assert_eq!(get_all_field_names(&reader), expected_projected_names);
assert_eq!(reader.metadata, HashMap::default());
assert_eq!(get_all_field_names(&batch), expected_projected_names);
assert_eq!(batch.metadata, HashMap::default());
};

let builder =
ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
let sync_builder_schema = builder.schema().clone();
let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
let mut reader = builder.with_projection(mask).build().unwrap();
let sync_reader_schema = reader.schema();
let batch = reader.next().unwrap().unwrap();
let sync_batch_schema = batch.schema();
assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);

// asynchronous should be same
let file = tokio::fs::File::from(file.try_clone().unwrap());
let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
let async_builder_schema = builder.schema().clone();
let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
let mut reader = builder.with_projection(mask).build().unwrap();
let async_reader_schema = reader.schema().clone();
let batch = reader.next().await.unwrap().unwrap();
let async_batch_schema = batch.schema();
assert_schemas(
async_builder_schema,
async_reader_schema,
async_batch_schema,
);
}
}

#[tokio::test]
async fn test_get_row_group_column_bloom_filter_with_length() {
// convert to new parquet file with bloom_filter_length
Expand Down

0 comments on commit a36bf7a

Please sign in to comment.