diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index 21a60614cff2..c5a742d50e53 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -78,7 +78,7 @@ pub struct PartitionedFile { /// /// DataFusion relies on these statistics for planning (in particular to sort file groups), /// so if they are incorrect, incorrect answers may result. - pub statistics: Option, + pub statistics: Option>, /// An optional field for user defined per object metadata pub extensions: Option>, } diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 17850ea7585a..566464cdfab0 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -1193,7 +1193,7 @@ mod tests { }, partition_values: vec![ScalarValue::from(file.date)], range: None, - statistics: Some(Statistics { + statistics: Some(Arc::new(Statistics { num_rows: Precision::Absent, total_byte_size: Precision::Absent, column_statistics: file @@ -1213,7 +1213,7 @@ mod tests { .unwrap_or_default() }) .collect::>(), - }), + })), extensions: None, } } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 72aabefba595..dc4ae5025c11 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -354,7 +354,7 @@ impl ParquetExecBuilder { schema_adapter_factory, } = self; - let base_config = file_scan_config; + let mut base_config = file_scan_config; debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: {:?}, limit: {:?}", base_config.file_groups, base_config.projection, predicate, base_config.limit); @@ -391,6 +391,12 @@ impl ParquetExecBuilder { &projected_output_ordering, &base_config, ); + + base_config + .file_groups + .iter_mut() + .for_each(|g| g.iter_mut().for_each(|f| f.statistics = None)); + ParquetExec { base_config, projected_statistics, diff --git a/datafusion/core/src/datasource/statistics.rs b/datafusion/core/src/datasource/statistics.rs index 9d031a6bbc85..ddd365a0035c 100644 --- a/datafusion/core/src/datasource/statistics.rs +++ b/datafusion/core/src/datasource/statistics.rs @@ -58,7 +58,7 @@ pub async fn get_statistics_with_limit( if let Some(first_file) = all_files.next().await { let (mut file, file_stats) = first_file?; - file.statistics = Some(file_stats.as_ref().clone()); + file.statistics = Some(file_stats.clone()); result_files.push(file); // First file, we set them directly from the file statistics. @@ -83,7 +83,7 @@ pub async fn get_statistics_with_limit( if conservative_num_rows <= limit.unwrap_or(usize::MAX) { while let Some(current) = all_files.next().await { let (mut file, file_stats) = current?; - file.statistics = Some(file_stats.as_ref().clone()); + file.statistics = Some(file_stats.clone()); result_files.push(file); if !collect_stats { continue; diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index bc0a19336bae..95b6eaee50b7 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -559,7 +559,11 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile { .map(|v| v.try_into()) .collect::, _>>()?, range: val.range.as_ref().map(|v| v.try_into()).transpose()?, - statistics: val.statistics.as_ref().map(|v| v.try_into()).transpose()?, + statistics: val + .statistics + .as_ref() + .map(|v| v.try_into().map(|v| Arc::new(v))) + .transpose()?, extensions: None, }) } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 57cd22a99ae1..ae330e7421d4 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -527,7 +527,7 @@ impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile { .map(|v| v.try_into()) .collect::, _>>()?, range: pf.range.as_ref().map(|r| r.try_into()).transpose()?, - statistics: pf.statistics.as_ref().map(|s| s.into()), + statistics: pf.statistics.as_ref().map(|s| s.as_ref().into()), }) } }