From ca0afb3740f6c9b9eb56fa61cea7596ab817bb78 Mon Sep 17 00:00:00 2001 From: kamille Date: Fri, 9 Aug 2024 01:41:18 +0800 Subject: [PATCH 1/6] swich statistic to arc. --- benchmarks/README.md | 2 +- datafusion/core/src/datasource/listing/mod.rs | 2 +- .../core/src/datasource/physical_plan/file_scan_config.rs | 4 ++-- datafusion/core/src/datasource/statistics.rs | 4 ++-- datafusion/proto/src/physical_plan/to_proto.rs | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/benchmarks/README.md b/benchmarks/README.md index afaf28bb7576..eecbffe01929 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -96,7 +96,7 @@ git checkout mybranch This produces results like: ```shell -Comparing main and mybranch +Comparing main and mybranh -------------------- Benchmark tpch.json -------------------- diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index 21a60614cff2..c5a742d50e53 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -78,7 +78,7 @@ pub struct PartitionedFile { /// /// DataFusion relies on these statistics for planning (in particular to sort file groups), /// so if they are incorrect, incorrect answers may result. - pub statistics: Option, + pub statistics: Option>, /// An optional field for user defined per object metadata pub extensions: Option>, } diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 17850ea7585a..566464cdfab0 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -1193,7 +1193,7 @@ mod tests { }, partition_values: vec![ScalarValue::from(file.date)], range: None, - statistics: Some(Statistics { + statistics: Some(Arc::new(Statistics { num_rows: Precision::Absent, total_byte_size: Precision::Absent, column_statistics: file @@ -1213,7 +1213,7 @@ mod tests { .unwrap_or_default() }) .collect::>(), - }), + })), extensions: None, } } diff --git a/datafusion/core/src/datasource/statistics.rs b/datafusion/core/src/datasource/statistics.rs index 9d031a6bbc85..ddd365a0035c 100644 --- a/datafusion/core/src/datasource/statistics.rs +++ b/datafusion/core/src/datasource/statistics.rs @@ -58,7 +58,7 @@ pub async fn get_statistics_with_limit( if let Some(first_file) = all_files.next().await { let (mut file, file_stats) = first_file?; - file.statistics = Some(file_stats.as_ref().clone()); + file.statistics = Some(file_stats.clone()); result_files.push(file); // First file, we set them directly from the file statistics. @@ -83,7 +83,7 @@ pub async fn get_statistics_with_limit( if conservative_num_rows <= limit.unwrap_or(usize::MAX) { while let Some(current) = all_files.next().await { let (mut file, file_stats) = current?; - file.statistics = Some(file_stats.as_ref().clone()); + file.statistics = Some(file_stats.clone()); result_files.push(file); if !collect_stats { continue; diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 57cd22a99ae1..ae330e7421d4 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -527,7 +527,7 @@ impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile { .map(|v| v.try_into()) .collect::, _>>()?, range: pf.range.as_ref().map(|r| r.try_into()).transpose()?, - statistics: pf.statistics.as_ref().map(|s| s.into()), + statistics: pf.statistics.as_ref().map(|s| s.as_ref().into()), }) } } From e814d5455d2a4674ad6f5bb6f9a4c91bc3b9b23a Mon Sep 17 00:00:00 2001 From: kamille Date: Fri, 9 Aug 2024 01:44:36 +0800 Subject: [PATCH 2/6] fix readme. --- benchmarks/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/README.md b/benchmarks/README.md index eecbffe01929..b2d913dd7395 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -96,7 +96,7 @@ git checkout mybranch This produces results like: ```shell -Comparing main and mybranh +Comparing main and mybran -------------------- Benchmark tpch.json -------------------- From 67b4e49b1d68ab1139e5f3678e6de948721365f7 Mon Sep 17 00:00:00 2001 From: kamille Date: Fri, 9 Aug 2024 01:55:53 +0800 Subject: [PATCH 3/6] fix pb. --- datafusion/proto/src/physical_plan/from_proto.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index bc0a19336bae..931a272c08b1 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -559,7 +559,7 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile { .map(|v| v.try_into()) .collect::, _>>()?, range: val.range.as_ref().map(|v| v.try_into()).transpose()?, - statistics: val.statistics.as_ref().map(|v| v.try_into()).transpose()?, + statistics: val.statistics.as_ref().map(|v| v.try_into().map(|v| Arc::new(v))).transpose()?, extensions: None, }) } From 1af3df62932ad30093f9c9635ff252006e6f9ed3 Mon Sep 17 00:00:00 2001 From: kamille Date: Fri, 9 Aug 2024 01:57:35 +0800 Subject: [PATCH 4/6] fix readme. --- benchmarks/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/README.md b/benchmarks/README.md index b2d913dd7395..afaf28bb7576 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -96,7 +96,7 @@ git checkout mybranch This produces results like: ```shell -Comparing main and mybran +Comparing main and mybranch -------------------- Benchmark tpch.json -------------------- From 33a7a90c36131ec9e4974c0367dd87991410467c Mon Sep 17 00:00:00 2001 From: kamille Date: Fri, 9 Aug 2024 02:19:49 +0800 Subject: [PATCH 5/6] fix fmt. --- datafusion/proto/src/physical_plan/from_proto.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 931a272c08b1..95b6eaee50b7 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -559,7 +559,11 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile { .map(|v| v.try_into()) .collect::, _>>()?, range: val.range.as_ref().map(|v| v.try_into()).transpose()?, - statistics: val.statistics.as_ref().map(|v| v.try_into().map(|v| Arc::new(v))).transpose()?, + statistics: val + .statistics + .as_ref() + .map(|v| v.try_into().map(|v| Arc::new(v))) + .transpose()?, extensions: None, }) } From fcbfa9227d22b290af9e4e67bfc25c558a0d918c Mon Sep 17 00:00:00 2001 From: kamille Date: Fri, 9 Aug 2024 02:26:49 +0800 Subject: [PATCH 6/6] release the arc stats before executing. --- .../core/src/datasource/physical_plan/parquet/mod.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 72aabefba595..dc4ae5025c11 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -354,7 +354,7 @@ impl ParquetExecBuilder { schema_adapter_factory, } = self; - let base_config = file_scan_config; + let mut base_config = file_scan_config; debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: {:?}, limit: {:?}", base_config.file_groups, base_config.projection, predicate, base_config.limit); @@ -391,6 +391,12 @@ impl ParquetExecBuilder { &projected_output_ordering, &base_config, ); + + base_config + .file_groups + .iter_mut() + .for_each(|g| g.iter_mut().for_each(|f| f.statistics = None)); + ParquetExec { base_config, projected_statistics,