From dee926519030301f052dc2c3196e4fbef0da4c47 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Fri, 12 Apr 2024 15:12:09 +0800 Subject: [PATCH 01/10] Prune pages are all null in ParquetExec by row_counts and fix NOT NULL prune --- .../physical_plan/parquet/page_filter.rs | 18 ++++- .../core/src/physical_optimizer/pruning.rs | 47 +++++++++----- datafusion/core/tests/parquet/mod.rs | 65 ++++++++++++++++--- datafusion/core/tests/parquet/page_pruning.rs | 49 ++++++++++++++ .../core/tests/parquet/row_group_pruning.rs | 2 +- 5 files changed, 154 insertions(+), 27 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs index c7706f3458d0..9aee9765cd44 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs @@ -27,6 +27,7 @@ use arrow_schema::Schema; use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; +use itertools::Itertools; use log::{debug, trace}; use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor}; use parquet::{ @@ -314,6 +315,7 @@ fn prune_pages_in_one_row_group( col_page_indexes, col_offset_indexes, target_type: &target_type, + num_rows_in_row_group: group.num_rows(), }; match predicate.prune(&pruning_stats) { @@ -385,6 +387,7 @@ struct PagesPruningStatistics<'a> { // target_type means the logical type in schema: like 'DECIMAL' is the logical type, but the // real physical type in parquet file may be `INT32, INT64, FIXED_LEN_BYTE_ARRAY` target_type: &'a Option, + num_rows_in_row_group: i64, } // Extract the min or max value calling `func` from page idex @@ -548,7 +551,20 @@ impl<'a> PruningStatistics for PagesPruningStatistics<'a> { } fn row_counts(&self, _column: &datafusion_common::Column) -> Option { - None + // see https://github.com/apache/arrow-rs/blob/91f0b1771308609ca27db0fb1d2d49571b3980d8/parquet/src/file/metadata.rs#L979-L982 + let mut first_row_index = self + .col_offset_indexes + .iter() + .map(|i| i.first_row_index) + .collect_vec(); + first_row_index.push(self.num_rows_in_row_group); + + let row_count_per_page: Vec<_> = first_row_index + .windows(2) + .map(|window| Some(window[1] - window[0])) + .collect(); + + Some(Arc::new(Int64Array::from_iter(row_count_per_page))) } fn contained( diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index ebb811408fb3..43aa8fd2fa4b 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -335,7 +335,7 @@ pub trait PruningStatistics { /// `x < 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_max < 5 END` /// `x = 5 AND y = 10` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_min <= 5 AND 5 <= x_max END AND CASE WHEN y_null_count = y_row_count THEN false ELSE y_min <= 10 AND 10 <= y_max END` /// `x IS NULL` | `x_null_count > 0` -/// `x IS NOT NULL` | `x_null_count = 0` +/// `x IS NOT NULL` | `!(x_null_count = row_count)` /// `CAST(x as int) = 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE CAST(x_min as int) <= 5 AND 5 <= CAST(x_max as int) END` /// /// ## Predicate Evaluation @@ -1241,9 +1241,11 @@ fn build_single_column_expr( /// if the column may contain null, and false if definitely does not /// contain null. /// If set `with_not` to true: which means is not null +/// because datafusion use false flag of expr result to prune unit (row group, page ..) /// Given an expression reference to `expr`, if `expr` is a column expression, /// returns a pruning expression in terms of IsNotNull that will evaluate to true -/// if the column not contain any null, and false if definitely contain null. +/// if the column may contain any non-null values, and false if definitely does not contain +/// non-null values null as all null values. fn build_is_null_column_expr( expr: &Arc, schema: &Schema, @@ -1254,26 +1256,39 @@ fn build_is_null_column_expr( let field = schema.field_with_name(col.name()).ok()?; let null_count_field = &Field::new(field.name(), DataType::UInt64, true); - required_columns - .null_count_column_expr(col, expr, null_count_field) - .map(|null_count_column_expr| { - if with_not { - // IsNotNull(column) => null_count = 0 - Arc::new(phys_expr::BinaryExpr::new( - null_count_column_expr, - Operator::Eq, - Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))), - )) as _ - } else { + if with_not { + if let Ok(row_count_expr) = + required_columns.row_count_column_expr(col, expr, null_count_field) + { + required_columns + .null_count_column_expr(col, expr, null_count_field) + .map(|null_count_column_expr| { + // IsNotNull(column) => null_count == row_count + // but use false to prune the whole unit so need add the negate + let equal_expr = Arc::new(phys_expr::BinaryExpr::new( + null_count_column_expr, + Operator::Eq, + row_count_expr, + )); + Arc::new(phys_expr::NotExpr::new(equal_expr)) as _ + }) + .ok() + } else { + return None; + } + } else { + required_columns + .null_count_column_expr(col, expr, null_count_field) + .map(|null_count_column_expr| { // IsNull(column) => null_count > 0 Arc::new(phys_expr::BinaryExpr::new( null_count_column_expr, Operator::Gt, Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))), )) as _ - } - }) - .ok() + }) + .ok() + } } else { None } diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index f36afe1976b1..f90d0e8afb4c 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -28,7 +28,7 @@ use arrow::{ record_batch::RecordBatch, util::pretty::pretty_format_batches, }; -use arrow_array::new_null_array; +use arrow_array::make_array; use chrono::{Datelike, Duration, TimeDelta}; use datafusion::{ datasource::{physical_plan::ParquetExec, provider_as_source, TableProvider}, @@ -77,6 +77,7 @@ enum Scenario { ByteArray, PeriodsInColumnNames, WithNullValues, + WithNullValuesPageLevel, } enum Unit { @@ -632,8 +633,13 @@ fn make_names_batch(name: &str, service_name_values: Vec<&str>) -> RecordBatch { RecordBatch::try_new(schema, vec![Arc::new(name), Arc::new(service_name)]).unwrap() } -/// Return record batch with i8, i16, i32, and i64 sequences with all Null values -fn make_all_null_values() -> RecordBatch { +/// Return record batch with i8, i16, i32, and i64 sequences with Null values +/// here 5 rows in page when using Unit::Page +fn make_int_batches_with_null( + null_values: usize, + no_null_values_start: usize, + no_null_values_end: usize, +) -> RecordBatch { let schema = Arc::new(Schema::new(vec![ Field::new("i8", DataType::Int8, true), Field::new("i16", DataType::Int16, true), @@ -641,13 +647,46 @@ fn make_all_null_values() -> RecordBatch { Field::new("i64", DataType::Int64, true), ])); + let v8: Vec = (no_null_values_start as _..no_null_values_end as _).collect(); + let v16: Vec = (no_null_values_start as _..no_null_values_end as _).collect(); + let v32: Vec = (no_null_values_start as _..no_null_values_end as _).collect(); + let v64: Vec = (no_null_values_start as _..no_null_values_end as _).collect(); + RecordBatch::try_new( schema, vec![ - new_null_array(&DataType::Int8, 5), - new_null_array(&DataType::Int16, 5), - new_null_array(&DataType::Int32, 5), - new_null_array(&DataType::Int64, 5), + make_array( + Int8Array::from_iter( + v8.into_iter() + .map(Some) + .chain(std::iter::repeat(None).take(null_values)), + ) + .to_data(), + ), + make_array( + Int16Array::from_iter( + v16.into_iter() + .map(Some) + .chain(std::iter::repeat(None).take(null_values)), + ) + .to_data(), + ), + make_array( + Int32Array::from_iter( + v32.into_iter() + .map(Some) + .chain(std::iter::repeat(None).take(null_values)), + ) + .to_data(), + ), + make_array( + Int64Array::from_iter( + v64.into_iter() + .map(Some) + .chain(std::iter::repeat(None).take(null_values)), + ) + .to_data(), + ), ], ) .unwrap() @@ -824,9 +863,17 @@ fn create_data_batch(scenario: Scenario) -> Vec { } Scenario::WithNullValues => { vec![ - make_all_null_values(), + make_int_batches_with_null(5, 0, 0), make_int_batches(1, 6), - make_all_null_values(), + make_int_batches_with_null(5, 0, 0), + ] + } + Scenario::WithNullValuesPageLevel => { + vec![ + make_int_batches_with_null(5, 1, 6), + make_int_batches(1, 11), + make_int_batches_with_null(1, 1, 10), + make_int_batches_with_null(5, 1, 6), ] } } diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index da9617f13ee9..33ccc638d088 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -871,6 +871,55 @@ async fn without_pushdown_filter() { assert!(bytes_scanned_with_filter > bytes_scanned_without_filter); } +#[tokio::test] +// Data layout like this: +// row_group1: page1(1~5), page2(All Null) +// row_group2: page1(1~5), page2(6~10) +// row_group3: page1(1~5), page2(6~9 + Null) +// row_group4: page1(1~5), page2(All Null) +// total 40 rows +async fn test_pages_with_null_values() { + test_prune( + Scenario::WithNullValuesPageLevel, + "SELECT * FROM t where i8 <= 6", + Some(0), + // expect prune two pages which 10 rows + Some(10), + 22, + ) + .await; + + test_prune( + Scenario::WithNullValuesPageLevel, + "SELECT * FROM t where \"i16\" is not null", + Some(0), + // expect prune two pages which 10 rows + Some(10), + 29, + ) + .await; + + test_prune( + Scenario::WithNullValuesPageLevel, + "SELECT * FROM t where \"i32\" is null", + Some(0), + // expect prune 5 pages which 25 rows + Some(25), + 11, + ) + .await; + + test_prune( + Scenario::WithNullValuesPageLevel, + "SELECT * FROM t where \"i64\" > 6", + Some(0), + // 6 pages will be pruned which 30 rows + Some(30), + 7, + ) + .await; +} + fn cast_count_metric(metric: MetricValue) -> Option { match metric { MetricValue::Count { count, .. } => Some(count.value()), diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index 29bf1ef0a8d4..b3f1fec1753b 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -1296,7 +1296,7 @@ async fn test_row_group_with_null_values() { .test_row_group_prune() .await; - // After pruning, only row group 2should be selected + // After pruning, only row group 2 should be selected RowGroupPruningTest::new() .with_scenario(Scenario::WithNullValues) .with_query("SELECT * FROM t WHERE \"i16\" is Not Null") From efcf6c35894e9d8a9d8a110ed1a7c203436f7c1d Mon Sep 17 00:00:00 2001 From: yangjiang Date: Fri, 12 Apr 2024 15:40:50 +0800 Subject: [PATCH 02/10] fix clippy --- datafusion/core/src/physical_optimizer/pruning.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 43aa8fd2fa4b..c0610d1a9305 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -1274,7 +1274,7 @@ fn build_is_null_column_expr( }) .ok() } else { - return None; + None } } else { required_columns From 88e62fd91c41d86618018aa0cb135b5879e12d99 Mon Sep 17 00:00:00 2001 From: Yang Jiang Date: Sat, 13 Apr 2024 18:34:56 +0800 Subject: [PATCH 03/10] Update datafusion/core/src/physical_optimizer/pruning.rs Co-authored-by: Andrew Lamb --- datafusion/core/src/physical_optimizer/pruning.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index c0610d1a9305..912fa7c371b3 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -335,7 +335,7 @@ pub trait PruningStatistics { /// `x < 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_max < 5 END` /// `x = 5 AND y = 10` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_min <= 5 AND 5 <= x_max END AND CASE WHEN y_null_count = y_row_count THEN false ELSE y_min <= 10 AND 10 <= y_max END` /// `x IS NULL` | `x_null_count > 0` -/// `x IS NOT NULL` | `!(x_null_count = row_count)` +/// `x IS NOT NULL` | `x_null_count != row_count` /// `CAST(x as int) = 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE CAST(x_min as int) <= 5 AND 5 <= CAST(x_max as int) END` /// /// ## Predicate Evaluation From 641540ccf6b6a0826993bc7047e6d9ada91a22dd Mon Sep 17 00:00:00 2001 From: Yang Jiang Date: Sat, 13 Apr 2024 18:35:51 +0800 Subject: [PATCH 04/10] Update datafusion/core/tests/parquet/page_pruning.rs Co-authored-by: Andrew Lamb --- datafusion/core/tests/parquet/page_pruning.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 33ccc638d088..92c71dd7288f 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -893,7 +893,7 @@ async fn test_pages_with_null_values() { Scenario::WithNullValuesPageLevel, "SELECT * FROM t where \"i16\" is not null", Some(0), - // expect prune two pages which 10 rows + // expect prune (row_group1, page2) and (row_group4, page2) = 10 rows Some(10), 29, ) From 8d173c6a91dfe855b07e1a96edeba239f1cb3a06 Mon Sep 17 00:00:00 2001 From: Yang Jiang Date: Sat, 13 Apr 2024 18:35:59 +0800 Subject: [PATCH 05/10] Update datafusion/core/tests/parquet/page_pruning.rs Co-authored-by: Andrew Lamb --- datafusion/core/tests/parquet/page_pruning.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 92c71dd7288f..b0c03e8be2b4 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -883,7 +883,8 @@ async fn test_pages_with_null_values() { Scenario::WithNullValuesPageLevel, "SELECT * FROM t where i8 <= 6", Some(0), - // expect prune two pages which 10 rows + // expect prune pages with all null or pages that have only values greater than 6 + // (row_group1, page2), (row_group4, page2) Some(10), 22, ) From 65dc157606b907673e0dcd8a483f0283ee397fea Mon Sep 17 00:00:00 2001 From: Yang Jiang Date: Sat, 13 Apr 2024 18:36:05 +0800 Subject: [PATCH 06/10] Update datafusion/core/tests/parquet/page_pruning.rs Co-authored-by: Andrew Lamb --- datafusion/core/tests/parquet/page_pruning.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index b0c03e8be2b4..6c553c02ec90 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -914,7 +914,9 @@ async fn test_pages_with_null_values() { Scenario::WithNullValuesPageLevel, "SELECT * FROM t where \"i64\" > 6", Some(0), - // 6 pages will be pruned which 30 rows + // expect to prune pages where i is all null, or where always <= 5 + // (row_group1, page1+2), (row_group2, page1), (row_group3, page1) (row_group4, page1+2) = 30 rows + // Some(30), 7, ) From 75a6d0a0848685fd4d20a7d33da5344c5960b102 Mon Sep 17 00:00:00 2001 From: Yang Jiang Date: Sat, 13 Apr 2024 18:36:19 +0800 Subject: [PATCH 07/10] Update datafusion/core/tests/parquet/page_pruning.rs Co-authored-by: Andrew Lamb --- datafusion/core/tests/parquet/page_pruning.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 6c553c02ec90..3948bb565381 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -904,7 +904,7 @@ async fn test_pages_with_null_values() { Scenario::WithNullValuesPageLevel, "SELECT * FROM t where \"i32\" is null", Some(0), - // expect prune 5 pages which 25 rows + // expect prune (row_group1, page1), (row_group2, page1+2), (row_group3, page1), (row_group3, page1) = 25 rows Some(25), 11, ) From 7b014a2a3cfb0c0d2084d8d3b732be1c40ae6bbb Mon Sep 17 00:00:00 2001 From: yangjiang Date: Sat, 13 Apr 2024 18:56:59 +0800 Subject: [PATCH 08/10] remove allocate vec --- .../datasource/physical_plan/parquet/page_filter.rs | 8 ++++---- datafusion/core/src/physical_optimizer/pruning.rs | 10 ++++------ datafusion/core/tests/parquet/page_pruning.rs | 1 - 3 files changed, 8 insertions(+), 11 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs index 9aee9765cd44..fca8b53da539 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs @@ -27,6 +27,7 @@ use arrow_schema::Schema; use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; +use futures::stream::iter; use itertools::Itertools; use log::{debug, trace}; use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor}; @@ -559,10 +560,9 @@ impl<'a> PruningStatistics for PagesPruningStatistics<'a> { .collect_vec(); first_row_index.push(self.num_rows_in_row_group); - let row_count_per_page: Vec<_> = first_row_index - .windows(2) - .map(|window| Some(window[1] - window[0])) - .collect(); + let row_count_per_page = self.col_offset_indexes.windows(2).map(|location| { + Some(location[1].first_row_index - location[0].first_row_index) + }); Some(Arc::new(Int64Array::from_iter(row_count_per_page))) } diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 912fa7c371b3..da4895fa13d1 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -1240,12 +1240,10 @@ fn build_single_column_expr( /// returns a pruning expression in terms of IsNull that will evaluate to true /// if the column may contain null, and false if definitely does not /// contain null. -/// If set `with_not` to true: which means is not null -/// because datafusion use false flag of expr result to prune unit (row group, page ..) -/// Given an expression reference to `expr`, if `expr` is a column expression, -/// returns a pruning expression in terms of IsNotNull that will evaluate to true -/// if the column may contain any non-null values, and false if definitely does not contain -/// non-null values null as all null values. +/// If `with_not` is true, build a pruning expression for `col IS NOT NULL`: `col_count != col_null_count` +/// The pruning expression evaluates to true ONLY if the column definitely CONTAINS +/// at least one NULL value. In this case we can know that `IS NOT NULL` can not be true and +/// thus can prune the row group / value fn build_is_null_column_expr( expr: &Arc, schema: &Schema, diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 3948bb565381..1615a1c5766a 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -916,7 +916,6 @@ async fn test_pages_with_null_values() { Some(0), // expect to prune pages where i is all null, or where always <= 5 // (row_group1, page1+2), (row_group2, page1), (row_group3, page1) (row_group4, page1+2) = 30 rows - // Some(30), 7, ) From b2d3641ba76d84b23cb11cefdf252e551835c000 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Sat, 13 Apr 2024 19:05:34 +0800 Subject: [PATCH 09/10] better way avoid allocate vec --- .../physical_plan/parquet/page_filter.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs index fca8b53da539..402cc106492e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs @@ -27,8 +27,6 @@ use arrow_schema::Schema; use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; -use futures::stream::iter; -use itertools::Itertools; use log::{debug, trace}; use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor}; use parquet::{ @@ -553,17 +551,17 @@ impl<'a> PruningStatistics for PagesPruningStatistics<'a> { fn row_counts(&self, _column: &datafusion_common::Column) -> Option { // see https://github.com/apache/arrow-rs/blob/91f0b1771308609ca27db0fb1d2d49571b3980d8/parquet/src/file/metadata.rs#L979-L982 - let mut first_row_index = self - .col_offset_indexes - .iter() - .map(|i| i.first_row_index) - .collect_vec(); - first_row_index.push(self.num_rows_in_row_group); let row_count_per_page = self.col_offset_indexes.windows(2).map(|location| { Some(location[1].first_row_index - location[0].first_row_index) }); + // append the last page row count + let row_count_per_page = row_count_per_page.chain(std::iter::once(Some( + self.num_rows_in_row_group + - self.col_offset_indexes.last().unwrap().first_row_index, + ))); + Some(Arc::new(Int64Array::from_iter(row_count_per_page))) } From f66cce3e44defe0de8c42ef4ff78a4369ca9790c Mon Sep 17 00:00:00 2001 From: yangjiang Date: Sat, 13 Apr 2024 23:38:43 +0800 Subject: [PATCH 10/10] simply expr --- datafusion/core/src/physical_optimizer/pruning.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index da4895fa13d1..d8a3814d77e1 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -1261,14 +1261,12 @@ fn build_is_null_column_expr( required_columns .null_count_column_expr(col, expr, null_count_field) .map(|null_count_column_expr| { - // IsNotNull(column) => null_count == row_count - // but use false to prune the whole unit so need add the negate - let equal_expr = Arc::new(phys_expr::BinaryExpr::new( + // IsNotNull(column) => null_count != row_count + Arc::new(phys_expr::BinaryExpr::new( null_count_column_expr, - Operator::Eq, + Operator::NotEq, row_count_expr, - )); - Arc::new(phys_expr::NotExpr::new(equal_expr)) as _ + )) as _ }) .ok() } else {