From f90780da2ef3a82ba63f306e7f49b0ffe61ca374 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 30 Oct 2024 12:08:29 -0400 Subject: [PATCH] Apply projection to `Statistics` in `FilterExec` --- datafusion/common/src/stats.rs | 20 ++++++++ datafusion/physical-plan/src/filter.rs | 7 ++- .../sqllogictest/test_files/parquet.slt | 48 +++++++++++++++++++ 3 files changed, 74 insertions(+), 1 deletion(-) diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index d8e62b3045f9..1c774a95d0e8 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -258,6 +258,26 @@ impl Statistics { self } + /// Project the statistics to the given column indices. + /// + /// For example, if we had statistics for columns `{"a", "b", "c"}`, + /// projecting to `vec![2, 1]` would return statistics for columns `{"c", + /// "b"}`. + pub fn project(mut self, projection: Option<&Vec>) -> Self { + let Some(projection) = projection else { + return self; + }; + + // todo: it would be nice to avoid cloning column statistics if + // possible (e.g. if the projection did not contain duplicates) + self.column_statistics = projection + .iter() + .map(|&i| self.column_statistics[i].clone()) + .collect(); + + self + } + /// Calculates the statistics after `fetch` and `skip` operations apply. /// Here, `self` denotes per-partition statistics. Use the `n_partitions` /// parameter to compute global statistics in a multi-partition setting. diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 417d2098b083..100a1eecffe7 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -370,7 +370,12 @@ impl ExecutionPlan for FilterExec { /// The output statistics of a filtering operation can be estimated if the /// predicate's selectivity value can be determined for the incoming data. fn statistics(&self) -> Result { - Self::statistics_helper(&self.input, self.predicate(), self.default_selectivity) + let stats = Self::statistics_helper( + &self.input, + self.predicate(), + self.default_selectivity, + )?; + Ok(stats.project(self.projection.as_ref())) } } diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index f8b163adc796..031eb9f0ff38 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -348,3 +348,51 @@ DROP TABLE list_columns; # Clean up statement ok DROP TABLE listing_table; + +## Tests for https://github.com/apache/datafusion/issues/13186 +statement ok +create table cpu (time timestamp, usage_idle float, usage_user float, cpu int); + +statement ok +insert into cpu values ('1970-01-01 00:00:00', 1.0, 2.0, 3); + +# must put it into a parquet file to get statistics +statement ok +copy (select * from cpu) to 'test_files/scratch/parquet/cpu.parquet'; + +# Run queries against parquet files +statement ok +create external table cpu_parquet +stored as parquet +location 'test_files/scratch/parquet/cpu.parquet'; + +# Double filtering +# +# Expect 1 row for both queries +query PI +select time, rn +from ( + select time, row_number() OVER (ORDER BY usage_idle, time) as rn + from cpu + where cpu = 3 +) where rn > 0; +---- +1970-01-01T00:00:00 1 + +query PI +select time, rn +from ( + select time, row_number() OVER (ORDER BY usage_idle, time) as rn + from cpu_parquet + where cpu = 3 +) where rn > 0; +---- +1970-01-01T00:00:00 1 + + +# Clean up +statement ok +drop table cpu; + +statement ok +drop table cpu_parquet;