diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 0c5c324d008d..fa0c010ee2f6 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -550,6 +550,12 @@ pub(crate) mod test_util { use parquet::file::properties::WriterProperties; use tempfile::NamedTempFile; + /// Writes `batches` to a temporary parquet file + /// + /// If multi_page is set to `true`, all batches are written into + /// one temporary parquet file and the parquet file is written + /// with 2 rows per data page (used to test page filtering and + /// boundaries). pub async fn store_parquet( batches: Vec, multi_page: bool, diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 1c0ef4de7037..5784fd9e1484 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -834,82 +834,117 @@ mod tests { parquet_exec: Arc, } - /// writes each RecordBatch as an individual parquet file and re-reads - /// the data back. Returns the data as [RecordBatch]es - async fn round_trip_to_parquet( - batches: Vec, + /// round-trip record batches by writing each individual RecordBatch to + /// a parquet file and then reading that parquet file with the specified + /// options. If page_index_predicate is set to `true`, all RecordBatches + /// are written into a parquet file instead. + #[derive(Debug, Default)] + struct RoundTrip { projection: Option>, schema: Option, predicate: Option, pushdown_predicate: bool, - ) -> Result> { - round_trip( - batches, - projection, - schema, - predicate, - pushdown_predicate, - false, - ) - .await - .batches + page_index_predicate: bool, } - /// Writes each RecordBatch as an individual parquet file and then - /// reads them back. Returns the parquet exec as well as the data - /// as [RecordBatch]es - async fn round_trip( - batches: Vec, - projection: Option>, - schema: Option, - predicate: Option, - pushdown_predicate: bool, - page_index_predicate: bool, - ) -> RoundTripResult { - let file_schema = match schema { - Some(schema) => schema, - None => Arc::new( - Schema::try_merge(batches.iter().map(|b| b.schema().as_ref().clone())) - .unwrap(), - ), - }; + impl RoundTrip { + fn new() -> Self { + Default::default() + } - let (meta, _files) = store_parquet(batches, page_index_predicate).await.unwrap(); - let file_groups = meta.into_iter().map(Into::into).collect(); + fn with_projection(mut self, projection: Vec) -> Self { + self.projection = Some(projection); + self + } - // prepare the scan - let mut parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: vec![file_groups], - file_schema, - statistics: Statistics::default(), - projection, - limit: None, - table_partition_cols: vec![], - output_ordering: None, - infinite_source: false, - }, - predicate, - None, - ); + fn with_schema(mut self, schema: SchemaRef) -> Self { + self.schema = Some(schema); + self + } - if pushdown_predicate { - parquet_exec = parquet_exec - .with_pushdown_filters(true) - .with_reorder_filters(true); + fn with_predicate(mut self, predicate: Expr) -> Self { + self.predicate = Some(predicate); + self } - if page_index_predicate { - parquet_exec = parquet_exec.with_enable_page_index(true); + fn with_pushdown_predicate(mut self) -> Self { + self.pushdown_predicate = true; + self } - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); - let parquet_exec = Arc::new(parquet_exec); - RoundTripResult { - batches: collect(parquet_exec.clone(), task_ctx).await, - parquet_exec, + fn with_page_index_predicate(mut self) -> Self { + self.page_index_predicate = true; + self + } + + /// run the test, returning only the resulting RecordBatches + async fn round_trip_to_batches( + self, + batches: Vec, + ) -> Result> { + self.round_trip(batches).await.batches + } + + /// run the test, returning the `RoundTripResult` + async fn round_trip(self, batches: Vec) -> RoundTripResult { + let Self { + projection, + schema, + predicate, + pushdown_predicate, + page_index_predicate, + } = self; + + let file_schema = match schema { + Some(schema) => schema, + None => Arc::new( + Schema::try_merge( + batches.iter().map(|b| b.schema().as_ref().clone()), + ) + .unwrap(), + ), + }; + + // If testing with page_index_predicate, write parquet + // files with multiple pages + let multi_page = page_index_predicate; + let (meta, _files) = store_parquet(batches, multi_page).await.unwrap(); + let file_groups = meta.into_iter().map(Into::into).collect(); + + // prepare the scan + let mut parquet_exec = ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups: vec![file_groups], + file_schema, + statistics: Statistics::default(), + projection, + limit: None, + table_partition_cols: vec![], + output_ordering: None, + infinite_source: false, + }, + predicate, + None, + ); + + if pushdown_predicate { + parquet_exec = parquet_exec + .with_pushdown_filters(true) + .with_reorder_filters(true); + } + + if page_index_predicate { + parquet_exec = parquet_exec.with_enable_page_index(true); + } + + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + let parquet_exec = Arc::new(parquet_exec); + RoundTripResult { + batches: collect(parquet_exec.clone(), task_ctx).await, + parquet_exec, + } } } @@ -972,10 +1007,10 @@ mod tests { let batch3 = add_to_batch(&batch1, "c3", c3); // read/write them files: - let read = - round_trip_to_parquet(vec![batch1, batch2, batch3], None, None, None, false) - .await - .unwrap(); + let read = RoundTrip::new() + .round_trip_to_batches(vec![batch1, batch2, batch3]) + .await + .unwrap(); let expected = vec![ "+-----+----+----+", "| c1 | c2 | c3 |", @@ -1014,7 +1049,8 @@ mod tests { let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]); // read/write them files: - let read = round_trip_to_parquet(vec![batch1, batch2], None, None, None, false) + let read = RoundTrip::new() + .round_trip_to_batches(vec![batch1, batch2]) .await .unwrap(); let expected = vec![ @@ -1048,7 +1084,8 @@ mod tests { let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]); // read/write them files: - let read = round_trip_to_parquet(vec![batch1, batch2], None, None, None, false) + let read = RoundTrip::new() + .round_trip_to_batches(vec![batch1, batch2]) .await .unwrap(); let expected = vec![ @@ -1084,10 +1121,11 @@ mod tests { let filter = col("c2").eq(lit(2_i64)); // read/write them files: - let read = - round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), false) - .await - .unwrap(); + let read = RoundTrip::new() + .with_predicate(filter) + .round_trip_to_batches(vec![batch1, batch2]) + .await + .unwrap(); let expected = vec![ "+-----+----+----+", "| c1 | c3 | c2 |", @@ -1121,8 +1159,12 @@ mod tests { let filter = col("c2").eq(lit(2_i64)).or(col("c2").eq(lit(1_i64))); // read/write them files: - let rt = - round_trip(vec![batch1, batch2], None, None, Some(filter), true, false).await; + let rt = RoundTrip::new() + .with_predicate(filter) + .with_pushdown_predicate() + .round_trip(vec![batch1, batch2]) + .await; + let expected = vec![ "+----+----+----+", "| c1 | c3 | c2 |", @@ -1160,15 +1202,11 @@ mod tests { let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1), ("c4", c4)]); // read/write them files: - let read = round_trip_to_parquet( - vec![batch1, batch2], - Some(vec![0, 3]), - None, - None, - false, - ) - .await - .unwrap(); + let read = RoundTrip::new() + .with_projection(vec![0, 3]) + .round_trip_to_batches(vec![batch1, batch2]) + .await + .unwrap(); let expected = vec![ "+-----+-----+", "| c1 | c4 |", @@ -1206,10 +1244,11 @@ mod tests { let filter = col("c3").eq(lit(0_i8)); // read/write them files: - let read = - round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), false) - .await - .unwrap(); + let read = RoundTrip::new() + .with_predicate(filter) + .round_trip_to_batches(vec![batch1, batch2]) + .await + .unwrap(); // Predicate should prune all row groups assert_eq!(read.len(), 0); @@ -1231,10 +1270,11 @@ mod tests { let filter = col("c2").eq(lit(1_i64)); // read/write them files: - let read = - round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), false) - .await - .unwrap(); + let read = RoundTrip::new() + .with_predicate(filter) + .round_trip_to_batches(vec![batch1, batch2]) + .await + .unwrap(); // This does not look correct since the "c2" values in the result do not in fact match the predicate `c2 == 0` // but parquet pruning is not exact. If the min/max values are not defined (which they are not in this case since the it is @@ -1272,8 +1312,11 @@ mod tests { let filter = col("c2").eq(lit(1_i64)); // read/write them files: - let rt = - round_trip(vec![batch1, batch2], None, None, Some(filter), true, false).await; + let rt = RoundTrip::new() + .with_predicate(filter) + .with_pushdown_predicate() + .round_trip(vec![batch1, batch2]) + .await; let expected = vec![ "+----+----+", @@ -1301,7 +1344,10 @@ mod tests { let filter = col("c2").eq(lit(1_i64)).or(col("c1").eq(lit("bar"))); // read/write them files: - let read = round_trip_to_parquet(vec![batch1], None, None, Some(filter), true) + let read = RoundTrip::new() + .with_predicate(filter) + .with_pushdown_predicate() + .round_trip_to_batches(vec![batch1]) .await .unwrap(); @@ -1345,14 +1391,10 @@ mod tests { ]); // read/write them files: - let read = round_trip_to_parquet( - vec![batch1, batch2], - None, - Some(Arc::new(schema)), - None, - false, - ) - .await; + let read = RoundTrip::new() + .with_schema(Arc::new(schema)) + .round_trip_to_batches(vec![batch1, batch2]) + .await; assert_contains!(read.unwrap_err().to_string(), "Execution error: Failed to map column projection for field c3. Incompatible data types Float32 and Int8"); } @@ -1600,8 +1642,11 @@ mod tests { let filter = col("int").eq(lit(4_i32)); - let rt = - round_trip(vec![batch1, batch2], None, None, Some(filter), false, true).await; + let rt = RoundTrip::new() + .with_predicate(filter) + .with_page_index_predicate() + .round_trip(vec![batch1, batch2]) + .await; let metrics = rt.parquet_exec.metrics().unwrap(); @@ -1636,7 +1681,11 @@ mod tests { let filter = col("c1").not_eq(lit("bar")); // read/write them files: - let rt = round_trip(vec![batch1], None, None, Some(filter), true, false).await; + let rt = RoundTrip::new() + .with_predicate(filter) + .with_pushdown_predicate() + .round_trip(vec![batch1]) + .await; let metrics = rt.parquet_exec.metrics().unwrap(); @@ -1673,7 +1722,11 @@ mod tests { // on let filter = col("c1").not_eq(lit("bar")); - let rt = round_trip(vec![batch1], None, None, Some(filter), true, false).await; + let rt = RoundTrip::new() + .with_predicate(filter) + .with_pushdown_predicate() + .round_trip(vec![batch1]) + .await; // should have a pruning predicate let pruning_predicate = &rt.parquet_exec.pruning_predicate; @@ -1712,8 +1765,11 @@ mod tests { .otherwise(lit(false)) .unwrap(); - let rt = - round_trip(vec![batch1], None, None, Some(filter.clone()), true, false).await; + let rt = RoundTrip::new() + .with_predicate(filter.clone()) + .with_pushdown_predicate() + .round_trip(vec![batch1]) + .await; // Should not contain a pruning predicate let pruning_predicate = &rt.parquet_exec.pruning_predicate;