Skip to content

Commit

Permalink
Fix issue in filter pushdown with overloaded projection index (#4281)
Browse files Browse the repository at this point in the history
* Fix issue in filter pushdown with overloaded projection index

* Use BTreeSet for collecting distinct column indexes
  • Loading branch information
thinkharderdev authored Nov 18, 2022
1 parent 78401df commit 09e1c91
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 8 deletions.
5 changes: 3 additions & 2 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,7 @@ mod tests {
// batch2: c3(int8), c2(int64)
let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);

let filter = col("c2").eq(lit(2_i64));
let filter = col("c2").eq(lit(2_i64)).or(col("c2").eq(lit(1_i64)));

// read/write them files:
let rt =
Expand All @@ -998,13 +998,14 @@ mod tests {
"+----+----+----+",
"| c1 | c3 | c2 |",
"+----+----+----+",
"| | 10 | 1 |",
"| | 20 | 2 |",
"+----+----+----+",
];
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
let metrics = rt.parquet_exec.metrics().unwrap();
// Note there are were 6 rows in total (across three batches)
assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 4);
}

#[tokio::test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
use datafusion_common::{Column, DataFusionError, Result, ScalarValue, ToDFSchema};
use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter, RewriteRecursion};
use std::collections::BTreeSet;

use datafusion_expr::Expr;
use datafusion_optimizer::utils::split_conjunction_owned;
Expand Down Expand Up @@ -174,7 +175,7 @@ struct FilterCandidateBuilder<'a> {
expr: Expr,
file_schema: &'a Schema,
table_schema: &'a Schema,
required_column_indices: Vec<usize>,
required_column_indices: BTreeSet<usize>,
non_primitive_columns: bool,
projected_columns: bool,
}
Expand All @@ -185,7 +186,7 @@ impl<'a> FilterCandidateBuilder<'a> {
expr,
file_schema,
table_schema,
required_column_indices: vec![],
required_column_indices: BTreeSet::default(),
non_primitive_columns: false,
projected_columns: false,
}
Expand All @@ -209,7 +210,7 @@ impl<'a> FilterCandidateBuilder<'a> {
expr,
required_bytes,
can_use_index,
projection: self.required_column_indices,
projection: self.required_column_indices.into_iter().collect(),
}))
}
}
Expand All @@ -219,7 +220,7 @@ impl<'a> ExprRewriter for FilterCandidateBuilder<'a> {
fn pre_visit(&mut self, expr: &Expr) -> Result<RewriteRecursion> {
if let Expr::Column(column) = expr {
if let Ok(idx) = self.file_schema.index_of(&column.name) {
self.required_column_indices.push(idx);
self.required_column_indices.insert(idx);

if DataType::is_nested(self.file_schema.field(idx).data_type()) {
self.non_primitive_columns = true;
Expand Down Expand Up @@ -284,7 +285,10 @@ fn remap_projection(src: &[usize]) -> Vec<usize> {
/// Calculate the total compressed size of all `Column's required for
/// predicate `Expr`. This should represent the total amount of file IO
/// required to evaluate the predicate.
fn size_of_columns(columns: &[usize], metadata: &ParquetMetaData) -> Result<usize> {
fn size_of_columns(
columns: &BTreeSet<usize>,
metadata: &ParquetMetaData,
) -> Result<usize> {
let mut total_size = 0;
let row_groups = metadata.row_groups();
for idx in columns {
Expand All @@ -299,7 +303,10 @@ fn size_of_columns(columns: &[usize], metadata: &ParquetMetaData) -> Result<usiz
/// For a given set of `Column`s required for predicate `Expr` determine whether all
/// columns are sorted. Sorted columns may be queried more efficiently in the presence of
/// a PageIndex.
fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) -> Result<bool> {
fn columns_sorted(
_columns: &BTreeSet<usize>,
_metadata: &ParquetMetaData,
) -> Result<bool> {
// TODO How do we know this?
Ok(false)
}
Expand Down

0 comments on commit 09e1c91

Please sign in to comment.